On Fri, 7 Nov 2014 00:45:55 -0800 Zac Medico <zmed...@gentoo.org> wrote:
> This adds add support to generate a vdb_metadata_delta.json file > which tracks package merges / unmerges that occur between updates to > vdb_metadata.pickle. IndexedVardb can use the delta together with > vdb_metadata.pickle to reconstruct a complete view of /var/db/pkg, > so that it can avoid expensive listdir calls in /var/db/pkg/*. > Note that vdb_metadata.pickle is only updated periodically, in > order to avoid excessive re-writes of a large file. > > In order to test the performance gains from this patch, you need to > generate /var/cache/edb/vdb_metadata_delta.json first, which will > happen automatically if you run 'emerge -p anything' with root > privileges. > --- > pym/portage/dbapi/IndexedVardb.py | 35 ++++++++- > pym/portage/dbapi/vartree.py | 161 > +++++++++++++++++++++++++++++++++++--- 2 files changed, 185 > insertions(+), 11 deletions(-) > > diff --git a/pym/portage/dbapi/IndexedVardb.py > b/pym/portage/dbapi/IndexedVardb.py index 424defc..e225ca1 100644 > --- a/pym/portage/dbapi/IndexedVardb.py > +++ b/pym/portage/dbapi/IndexedVardb.py > @@ -3,6 +3,7 @@ > > import portage > from portage.dep import Atom > +from portage.exception import InvalidData > from portage.versions import _pkg_str > > class IndexedVardb(object): > @@ -42,7 +43,39 @@ class IndexedVardb(object): > if self._cp_map is not None: > return iter(sorted(self._cp_map)) > > - return self._iter_cp_all() > + cache_delta = self._vardb._cache_delta_load_race() > + if cache_delta is None: > + return self._iter_cp_all() > + > + packages = self._vardb._aux_cache["packages"] > + for delta in cache_delta["deltas"]: > + cpv = delta["package"] + "-" + > delta["version"] > + event = delta["event"] > + if event == "add": > + # Use aux_get to populate the cache > + # for this cpv. > + if cpv not in packages: > + try: > + > self._vardb.aux_get(cpv, ["DESCRIPTION"]) > + except KeyError: > + pass > + elif event == "remove": > + packages.pop(cpv, None) > + > + self._cp_map = cp_map = {} > + for cpv in packages: > + try: > + cpv = _pkg_str(cpv) > + except InvalidData: > + continue > + > + cp_list = cp_map.get(cpv.cp) > + if cp_list is None: > + cp_list = [] > + cp_map[cpv.cp] = cp_list > + cp_list.append(cpv) > + > + return iter(sorted(self._cp_map)) > > def _iter_cp_all(self): > self._cp_map = cp_map = {} looks good > diff --git a/pym/portage/dbapi/vartree.py > b/pym/portage/dbapi/vartree.py index 6ab4b92..fd4b099 100644 > --- a/pym/portage/dbapi/vartree.py > +++ b/pym/portage/dbapi/vartree.py > @@ -76,6 +76,7 @@ import gc > import grp > import io > from itertools import chain > +import json > import logging > import os as _os > import platform > @@ -109,6 +110,7 @@ class vardbapi(dbapi): > "|".join(_excluded_dirs) + r')$') > > _aux_cache_version = "1" > + _aux_cache_delta_version = "1" > _owners_cache_version = "1" > > # Number of uncached packages to trigger cache update, since > @@ -177,6 +179,8 @@ class vardbapi(dbapi): > self._aux_cache_obj = None > self._aux_cache_filename = os.path.join(self._eroot, > CACHE_PATH, "vdb_metadata.pickle") > + self._cache_delta_filename = > os.path.join(self._eroot, > + CACHE_PATH, "vdb_metadata_delta.json") > self._counter_path = os.path.join(self._eroot, > CACHE_PATH, "counter") > > @@ -511,6 +515,120 @@ class vardbapi(dbapi): > self.cpcache.pop(pkg_dblink.mysplit[0], None) > dircache.pop(pkg_dblink.dbcatdir, None) > The following code I would like to see either as an independant class and file if possible, then just instantiated here in the main vardbapi. Looking over the code, I didn't see much use of other class functions. This class is already too large in many ways. Also is there a possibility this code could be re-used as a generic delta cache anywhere else? Another possibility is moving this code and the aux_cache code to another class that the vardbapi class also subclasses. This would move all the cache code to a small class easily viewed, edited, maintained. This file is already 5k+ LOC and primarily the vardbapi class > + def _cache_delta(self, event, cpv, slot, counter): > + > + self.lock() > + try: > + deltas_obj = self._cache_delta_load() > + > + if deltas_obj is None: > + # We can't record meaningful deltas > without > + # a pre-existing state. > + return > + > + delta_node = { > + "event": event, > + "package": cpv.cp, > + "version": cpv.version, > + "slot": slot, > + "counter": "%s" % counter > + } > + > + deltas_obj["deltas"].append(delta_node) > + > + # Eliminate earlier nodes cancelled out by > later nodes > + # that have identical package and slot > attributes. > + filtered_list = [] > + slot_keys = set() > + version_keys = set() > + for delta_node in > reversed(deltas_obj["deltas"]): > + slot_key = (delta_node["package"], > + delta_node["slot"]) > + version_key = (delta_node["package"], > + delta_node["version"]) > + if not (slot_key in slot_keys or \ > + version_key in version_keys): > + > filtered_list.append(delta_node) > + slot_keys.add(slot_key) > + version_keys.add(version_key) > + > + filtered_list.reverse() > + deltas_obj["deltas"] = filtered_list > + > + f = > atomic_ofstream(self._cache_delta_filename, > + mode='w', > encoding=_encodings['repo.content']) > + json.dump(deltas_obj, f, ensure_ascii=False) > + f.close() > + > + finally: > + self.unlock() > + > + def _cache_delta_load(self): > + > + if not os.path.exists(self._aux_cache_filename): > + # If the primary cache doesn't exist yet, > then > + # we can't record a delta against it. > + return None > + > + try: > + with io.open(self._cache_delta_filename, 'r', > + encoding=_encodings['repo.content'], > + errors='strict') as f: > + cache_obj = json.load(f) > + except EnvironmentError as e: > + if e.errno not in (errno.ENOENT, > errno.ESTALE): > + raise > + except (SystemExit, KeyboardInterrupt): > + raise > + except Exception: > + # Corrupt, or not json format. > + pass > + else: > + try: > + version = cache_obj["version"] > + except KeyError: > + pass > + else: > + # If the timestamp recorded in the > deltas file > + # doesn't match aux_cache_timestamp, > then the > + # deltas are not valid. This means > that deltas > + # cannot be recorded until after the > next > + # vdb_metadata.pickle update, in > order to > + # guarantee consistency. > + if version == > self._aux_cache_delta_version: > + try: > + deltas = > cache_obj["deltas"] > + except KeyError: > + cache_obj["deltas"] > = deltas = [] + > + if isinstance(deltas, list): > + return cache_obj > + > + return None > + > + def _cache_delta_load_race(self): > + """ > + This calls _cache_delta_load and validates the > timestamp > + against the currently loaded _aux_cache. If a > concurrent > + update causes the timestamps to be inconsistent, then > + it reloads the caches and tries one more time before > + it aborts. In practice, the race is very unlikely, so > + this will usually succeed on the first try. > + """ > + > + tries = 2 > + while tries: > + tries -= 1 > + cache_delta = self._cache_delta_load() > + if cache_delta is not None and \ > + cache_delta.get("timestamp") != \ > + self._aux_cache.get("timestamp", > False): > + self._aux_cache_obj = None > + else: > + return cache_delta > + > + return None > + > def match(self, origdep, use_cache=1): > "caching match function" > mydep = dep_expand( > @@ -556,22 +674,37 @@ class vardbapi(dbapi): > long as at least part of the cache is still > valid).""" if self._flush_cache_enabled and \ > self._aux_cache is not None and \ > - len(self._aux_cache["modified"]) >= > self._aux_cache_threshold and \ > - secpass >= 2: > + secpass >= 2 and \ > + (len(self._aux_cache["modified"]) >= > self._aux_cache_threshold or > + not > os.path.exists(self._cache_delta_filename)): + > + > ensure_dirs(os.path.dirname(self._aux_cache_filename)) + > self._owners.populate() # index any > unindexed contents valid_nodes = set(self.cpv_all()) > for cpv in list(self._aux_cache["packages"]): > if cpv not in valid_nodes: > del > self._aux_cache["packages"][cpv] del self._aux_cache["modified"] > - try: > - f = > atomic_ofstream(self._aux_cache_filename, 'wb') > - pickle.dump(self._aux_cache, f, > protocol=2) > - f.close() > - apply_secpass_permissions( > - self._aux_cache_filename, > gid=portage_gid, mode=0o644) > - except (IOError, OSError) as e: > - pass > + timestamp = time.time() > + self._aux_cache["timestamp"] = timestamp > + > + f = > atomic_ofstream(self._aux_cache_filename, 'wb') > + pickle.dump(self._aux_cache, f, protocol=2) > + f.close() > + apply_secpass_permissions( > + self._aux_cache_filename, mode=0o644) > + > + f = > atomic_ofstream(self._cache_delta_filename, 'w', > + encoding=_encodings['repo.content'], > errors='strict') > + json.dump({ > + "version": > self._aux_cache_delta_version, > + "timestamp": timestamp > + }, f, ensure_ascii=False) > + f.close() > + apply_secpass_permissions( > + self._cache_delta_filename, > mode=0o644) + > self._aux_cache["modified"] = set() > > @property > @@ -1590,6 +1723,12 @@ class dblink(object): > self.dbdir, noiselevel=-1) > return > > + if self.dbdir is self.dbpkgdir: > + counter, = self.vartree.dbapi.aux_get( > + self.mycpv, ["COUNTER"]) > + self.vartree.dbapi._cache_delta("remove", > self.mycpv, > + self.settings["SLOT"].split("/")[0], > counter) + > shutil.rmtree(self.dbdir) > # If empty, remove parent category directory. > try: > @@ -4196,6 +4335,8 @@ class dblink(object): > self.delete() > _movefile(self.dbtmpdir, self.dbpkgdir, > mysettings=self.settings) self._merged_path(self.dbpkgdir, > os.lstat(self.dbpkgdir)) > + self.vartree.dbapi._cache_delta("add", > + self.mycpv, slot, counter) > finally: > self.unlockdb() > -- Brian Dolbec <dolsen>