On Fri,  7 Nov 2014 00:45:55 -0800
Zac Medico <zmed...@gentoo.org> wrote:

> This adds add support to generate a vdb_metadata_delta.json file
> which tracks package merges / unmerges that occur between updates to
> vdb_metadata.pickle. IndexedVardb can use the delta together with
> vdb_metadata.pickle to reconstruct a complete view of /var/db/pkg,
> so that it can avoid expensive listdir calls in /var/db/pkg/*.
> Note that vdb_metadata.pickle is only updated periodically, in
> order to avoid excessive re-writes of a large file.
> 
> In order to test the performance gains from this patch, you need to
> generate /var/cache/edb/vdb_metadata_delta.json first, which will
> happen automatically if you run 'emerge -p anything' with root
> privileges.
> ---
>  pym/portage/dbapi/IndexedVardb.py |  35 ++++++++-
>  pym/portage/dbapi/vartree.py      | 161
> +++++++++++++++++++++++++++++++++++--- 2 files changed, 185
> insertions(+), 11 deletions(-)
> 
> diff --git a/pym/portage/dbapi/IndexedVardb.py
> b/pym/portage/dbapi/IndexedVardb.py index 424defc..e225ca1 100644
> --- a/pym/portage/dbapi/IndexedVardb.py
> +++ b/pym/portage/dbapi/IndexedVardb.py
> @@ -3,6 +3,7 @@
>  
>  import portage
>  from portage.dep import Atom
> +from portage.exception import InvalidData
>  from portage.versions import _pkg_str
>  
>  class IndexedVardb(object):
> @@ -42,7 +43,39 @@ class IndexedVardb(object):
>               if self._cp_map is not None:
>                       return iter(sorted(self._cp_map))
>  
> -             return self._iter_cp_all()
> +             cache_delta = self._vardb._cache_delta_load_race()
> +             if cache_delta is None:
> +                     return self._iter_cp_all()
> +
> +             packages = self._vardb._aux_cache["packages"]
> +             for delta in cache_delta["deltas"]:
> +                     cpv = delta["package"] + "-" +
> delta["version"]
> +                     event = delta["event"]
> +                     if event == "add":
> +                             # Use aux_get to populate the cache
> +                             # for this cpv.
> +                             if cpv not in packages:
> +                                     try:
> +
> self._vardb.aux_get(cpv, ["DESCRIPTION"])
> +                                     except KeyError:
> +                                             pass
> +                     elif event == "remove":
> +                             packages.pop(cpv, None)
> +
> +             self._cp_map = cp_map = {}
> +             for cpv in packages:
> +                     try:
> +                             cpv = _pkg_str(cpv)
> +                     except InvalidData:
> +                             continue
> +
> +                     cp_list = cp_map.get(cpv.cp)
> +                     if cp_list is None:
> +                             cp_list = []
> +                             cp_map[cpv.cp] = cp_list
> +                     cp_list.append(cpv)
> +
> +             return iter(sorted(self._cp_map))
>  
>       def _iter_cp_all(self):
>               self._cp_map = cp_map = {}

looks good

> diff --git a/pym/portage/dbapi/vartree.py
> b/pym/portage/dbapi/vartree.py index 6ab4b92..fd4b099 100644
> --- a/pym/portage/dbapi/vartree.py
> +++ b/pym/portage/dbapi/vartree.py
> @@ -76,6 +76,7 @@ import gc
>  import grp
>  import io
>  from itertools import chain
> +import json
>  import logging
>  import os as _os
>  import platform
> @@ -109,6 +110,7 @@ class vardbapi(dbapi):
>               "|".join(_excluded_dirs) + r')$')
>  
>       _aux_cache_version        = "1"
> +     _aux_cache_delta_version  = "1"
>       _owners_cache_version     = "1"
>  
>       # Number of uncached packages to trigger cache update, since
> @@ -177,6 +179,8 @@ class vardbapi(dbapi):
>               self._aux_cache_obj = None
>               self._aux_cache_filename = os.path.join(self._eroot,
>                       CACHE_PATH, "vdb_metadata.pickle")
> +             self._cache_delta_filename =
> os.path.join(self._eroot,
> +                     CACHE_PATH, "vdb_metadata_delta.json")
>               self._counter_path = os.path.join(self._eroot,
>                       CACHE_PATH, "counter")
>  
> @@ -511,6 +515,120 @@ class vardbapi(dbapi):
>               self.cpcache.pop(pkg_dblink.mysplit[0], None)
>               dircache.pop(pkg_dblink.dbcatdir, None)
>  

The following code I would like to see either as an independant class
and file if possible, then just instantiated here in the main vardbapi.
Looking over the code, I didn't see much use of other class functions.
This class is already too large in many ways.  Also is there a
possibility this code could be re-used as a generic delta cache
anywhere else?

Another possibility is moving this code and the aux_cache code to
another class that the vardbapi class also subclasses.  This would move
all the cache code to a small class easily viewed, edited, maintained.

This file is already 5k+ LOC and primarily the vardbapi class

> +     def _cache_delta(self, event, cpv, slot, counter):
> +
> +             self.lock()
> +             try:
> +                     deltas_obj = self._cache_delta_load()
> +
> +                     if deltas_obj is None:
> +                             # We can't record meaningful deltas
> without
> +                             # a pre-existing state.
> +                             return
> +
> +                     delta_node = {
> +                             "event": event,
> +                             "package": cpv.cp,
> +                             "version": cpv.version,
> +                             "slot": slot,
> +                             "counter": "%s" % counter
> +                     }
> +
> +                     deltas_obj["deltas"].append(delta_node)
> +
> +                     # Eliminate earlier nodes cancelled out by
> later nodes
> +                     # that have identical package and slot
> attributes.
> +                     filtered_list = []
> +                     slot_keys = set()
> +                     version_keys = set()
> +                     for delta_node in
> reversed(deltas_obj["deltas"]):
> +                             slot_key = (delta_node["package"],
> +                                     delta_node["slot"])
> +                             version_key = (delta_node["package"],
> +                                     delta_node["version"])
> +                             if not (slot_key in slot_keys or \
> +                                     version_key in version_keys):
> +
> filtered_list.append(delta_node)
> +                                     slot_keys.add(slot_key)
> +                                     version_keys.add(version_key)
> +
> +                     filtered_list.reverse()
> +                     deltas_obj["deltas"] = filtered_list
> +
> +                     f =
> atomic_ofstream(self._cache_delta_filename,
> +                             mode='w',
> encoding=_encodings['repo.content'])
> +                     json.dump(deltas_obj, f, ensure_ascii=False)
> +                     f.close()
> +
> +             finally:
> +                     self.unlock()
> +
> +     def _cache_delta_load(self):
> +
> +             if not os.path.exists(self._aux_cache_filename):
> +                     # If the primary cache doesn't exist yet,
> then
> +                     # we can't record a delta against it.
> +                     return None
> +
> +             try:
> +                     with io.open(self._cache_delta_filename, 'r',
> +                             encoding=_encodings['repo.content'],
> +                             errors='strict') as f:
> +                             cache_obj = json.load(f)
> +             except EnvironmentError as e:
> +                     if e.errno not in (errno.ENOENT,
> errno.ESTALE):
> +                             raise
> +             except (SystemExit, KeyboardInterrupt):
> +                     raise
> +             except Exception:
> +                     # Corrupt, or not json format.
> +                     pass
> +             else:
> +                     try:
> +                             version = cache_obj["version"]
> +                     except KeyError:
> +                             pass
> +                     else:
> +                             # If the timestamp recorded in the
> deltas file
> +                             # doesn't match aux_cache_timestamp,
> then the
> +                             # deltas are not valid. This means
> that deltas
> +                             # cannot be recorded until after the
> next
> +                             # vdb_metadata.pickle update, in
> order to
> +                             # guarantee consistency.
> +                             if version ==
> self._aux_cache_delta_version:
> +                                     try:
> +                                             deltas =
> cache_obj["deltas"]
> +                                     except KeyError:
> +                                             cache_obj["deltas"]
> = deltas = [] +
> +                                     if isinstance(deltas, list):
> +                                             return cache_obj
> +
> +             return None
> +
> +     def _cache_delta_load_race(self):
> +             """
> +             This calls _cache_delta_load and validates the
> timestamp
> +             against the currently loaded _aux_cache. If a
> concurrent
> +             update causes the timestamps to be inconsistent, then
> +             it reloads the caches and tries one more time before
> +             it aborts. In practice, the race is very unlikely, so
> +             this will usually succeed on the first try.
> +             """
> +
> +             tries = 2
> +             while tries:
> +                     tries -= 1
> +                     cache_delta = self._cache_delta_load()
> +                     if cache_delta is not None and \
> +                             cache_delta.get("timestamp") != \
> +                             self._aux_cache.get("timestamp",
> False):
> +                             self._aux_cache_obj = None
> +                     else:
> +                             return cache_delta
> +
> +             return None
> +
>       def match(self, origdep, use_cache=1):
>               "caching match function"
>               mydep = dep_expand(
> @@ -556,22 +674,37 @@ class vardbapi(dbapi):
>               long as at least part of the cache is still
> valid).""" if self._flush_cache_enabled and \
>                       self._aux_cache is not None and \
> -                     len(self._aux_cache["modified"]) >=
> self._aux_cache_threshold and \
> -                     secpass >= 2:
> +                     secpass >= 2 and \
> +                     (len(self._aux_cache["modified"]) >=
> self._aux_cache_threshold or
> +                     not
> os.path.exists(self._cache_delta_filename)): +
> +
> ensure_dirs(os.path.dirname(self._aux_cache_filename)) +
>                       self._owners.populate() # index any
> unindexed contents valid_nodes = set(self.cpv_all())
>                       for cpv in list(self._aux_cache["packages"]):
>                               if cpv not in valid_nodes:
>                                       del
> self._aux_cache["packages"][cpv] del self._aux_cache["modified"]
> -                     try:
> -                             f =
> atomic_ofstream(self._aux_cache_filename, 'wb')
> -                             pickle.dump(self._aux_cache, f,
> protocol=2)
> -                             f.close()
> -                             apply_secpass_permissions(
> -                                     self._aux_cache_filename,
> gid=portage_gid, mode=0o644)
> -                     except (IOError, OSError) as e:
> -                             pass
> +                     timestamp = time.time()
> +                     self._aux_cache["timestamp"] = timestamp
> +
> +                     f =
> atomic_ofstream(self._aux_cache_filename, 'wb')
> +                     pickle.dump(self._aux_cache, f, protocol=2)
> +                     f.close()
> +                     apply_secpass_permissions(
> +                             self._aux_cache_filename, mode=0o644)
> +
> +                     f =
> atomic_ofstream(self._cache_delta_filename, 'w',
> +                             encoding=_encodings['repo.content'],
> errors='strict')
> +                     json.dump({
> +                             "version":
> self._aux_cache_delta_version,
> +                             "timestamp": timestamp
> +                             }, f, ensure_ascii=False)
> +                     f.close()
> +                     apply_secpass_permissions(
> +                             self._cache_delta_filename,
> mode=0o644) +
>                       self._aux_cache["modified"] = set()
>  
>       @property
> @@ -1590,6 +1723,12 @@ class dblink(object):
>                               self.dbdir, noiselevel=-1)
>                       return
>  
> +             if self.dbdir is self.dbpkgdir:
> +                     counter, = self.vartree.dbapi.aux_get(
> +                             self.mycpv, ["COUNTER"])
> +                     self.vartree.dbapi._cache_delta("remove",
> self.mycpv,
> +                             self.settings["SLOT"].split("/")[0],
> counter) +
>               shutil.rmtree(self.dbdir)
>               # If empty, remove parent category directory.
>               try:
> @@ -4196,6 +4335,8 @@ class dblink(object):
>                       self.delete()
>                       _movefile(self.dbtmpdir, self.dbpkgdir,
> mysettings=self.settings) self._merged_path(self.dbpkgdir,
> os.lstat(self.dbpkgdir))
> +                     self.vartree.dbapi._cache_delta("add",
> +                             self.mycpv, slot, counter)
>               finally:
>                       self.unlockdb()
>  



-- 
Brian Dolbec <dolsen>


Reply via email to