This patch changes the strategy used by optimize. Instead of maintaining a 'checksum' field for every file, and maintaining metadata about whether they are linked in the 'checksums' dir, it will only ever hash files that match size _exactly_ with a new file.
A DS that does not see 2 files of identical size will never hash any file. - Allows the datastore to handle large files (downloads, video captures) without paying a high 'hashing' price. - Removes the 'checksums' dirtree; which was redundant: we can evaluate whether two files are hardlinked comparing their inode. - There are no users of 'checksum' metadata field out of DS, so we remove it completely. - Removal of checksum metadata and the 'checksums' dirtree means less risk of out-of-date metadata. Caveats: - Assumes no program is producing large files of identical size. On the XO, this is true. Some file formats with no compression could trigger unwelcome hashing (ie: an activity that saves TIFF or BMP files). - The optimizer should run in a separate process, to avoid making the DS unresponsive. Was written to avoid races, but would need to flock() the file between hashing it and hardlinking it. I need to read up on forking a long-running process from a dbus service. --- Background: reviewing DS code and datastuctures, the checksums dirtree and strategy in optimize.py looked problematic to me. If I am saving a large video, I don't want it hashed (expensive op) if I can (cheaply) check that no other file is the same size. I could retain the 'checksum' metadata field -- and add some assurances that it does not get stale. It would help with large and identical-sized files, such as screenshots, which may otherwise end up hashed more than once, perhaps many times. --- src/carquinyol/datastore.py | 4 - src/carquinyol/filestore.py | 12 --- src/carquinyol/layoutmanager.py | 9 -- src/carquinyol/metadatastore.py | 4 +- src/carquinyol/optimizer.py | 225 ++++++++++++++++++--------------------- 5 files changed, 106 insertions(+), 148 deletions(-) diff --git a/src/carquinyol/datastore.py b/src/carquinyol/datastore.py index a859dfe..0ee70d2 100644 --- a/src/carquinyol/datastore.py +++ b/src/carquinyol/datastore.py @@ -362,9 +362,6 @@ class DataStore(dbus.service.Object): self._metadata_store.store(uid, props) self._index_store.store(uid, props) - if os.path.exists(self._file_store.get_file_path(uid)) and \ - (not file_path or os.path.exists(file_path)): - self._optimizer.remove(uid) self._file_store.store(uid, file_path, transfer_ownership, lambda * args: self._update_completion_cb(async_cb, async_err_cb, @@ -489,7 +486,6 @@ class DataStore(dbus.service.Object): self._mark_dirty() try: entry_path = layoutmanager.get_instance().get_entry_path(uid) - self._optimizer.remove(uid) self._index_store.delete(uid) self._file_store.delete(uid) self._metadata_store.delete(uid) diff --git a/src/carquinyol/filestore.py b/src/carquinyol/filestore.py index 0b34b69..38a4018 100644 --- a/src/carquinyol/filestore.py +++ b/src/carquinyol/filestore.py @@ -146,18 +146,6 @@ class FileStore(object): if os.path.exists(file_path): os.remove(file_path) - def hard_link_entry(self, new_uid, existing_uid): - existing_file = layoutmanager.get_instance().get_data_path( - existing_uid) - new_file = layoutmanager.get_instance().get_data_path(new_uid) - - logging.debug('removing %r', new_file) - os.remove(new_file) - - logging.debug('hard linking %r -> %r', new_file, existing_file) - os.link(existing_file, new_file) - - class AsyncCopy(object): """Copy a file in chunks in the idle loop. diff --git a/src/carquinyol/layoutmanager.py b/src/carquinyol/layoutmanager.py index 3179a98..ee8270c 100644 --- a/src/carquinyol/layoutmanager.py +++ b/src/carquinyol/layoutmanager.py @@ -35,9 +35,6 @@ class LayoutManager(object): if not os.path.exists(self._root_path): os.makedirs(self._root_path) - self._create_if_needed(self.get_checksums_dir()) - self._create_if_needed(self.get_queue_path()) - def _create_if_needed(self, path): if not os.path.exists(path): os.makedirs(path) @@ -74,12 +71,6 @@ class LayoutManager(object): def get_index_path(self): return os.path.join(self._root_path, 'index') - def get_checksums_dir(self): - return os.path.join(self._root_path, 'checksums') - - def get_queue_path(self): - return os.path.join(self.get_checksums_dir(), 'queue') - def find_all(self): uids = [] for f in os.listdir(self._root_path): diff --git a/src/carquinyol/metadatastore.py b/src/carquinyol/metadatastore.py index 52cc10f..b6c2497 100644 --- a/src/carquinyol/metadatastore.py +++ b/src/carquinyol/metadatastore.py @@ -4,8 +4,6 @@ from carquinyol import layoutmanager from carquinyol import metadatareader MAX_SIZE = 256 -_INTERNAL_KEYS = ['checksum'] - class MetadataStore(object): @@ -16,7 +14,7 @@ class MetadataStore(object): else: received_keys = metadata.keys() for key in os.listdir(metadata_path): - if key not in _INTERNAL_KEYS and key not in received_keys: + if key not in received_keys: os.remove(os.path.join(metadata_path, key)) metadata['uid'] = uid diff --git a/src/carquinyol/optimizer.py b/src/carquinyol/optimizer.py index c038c2b..2129937 100644 --- a/src/carquinyol/optimizer.py +++ b/src/carquinyol/optimizer.py @@ -30,133 +30,118 @@ class Optimizer(object): def __init__(self, file_store, metadata_store): self._file_store = file_store - self._metadata_store = metadata_store - self._enqueue_checksum_id = None + self._root_path = layoutmanager.get_instance().get_root_path() + self._optimized_flag_path = os.path.join(self._root_path, 'optimized') def optimize(self, uid): - """Add an entry to a queue of entries to be checked for duplicates. + """Schedule an optimization. """ if not os.path.exists(self._file_store.get_file_path(uid)): return - - queue_path = layoutmanager.get_instance().get_queue_path() - open(os.path.join(queue_path, uid), 'w').close() - logging.debug('optimize %r', os.path.join(queue_path, uid)) - - if self._enqueue_checksum_id is None: - self._enqueue_checksum_id = \ - gobject.idle_add(self._process_entry_cb, - priority=gobject.PRIORITY_LOW) - - def remove(self, uid): - """Remove any structures left from space optimization - - """ - checksum = self._metadata_store.get_property(uid, 'checksum') - if checksum is None: - return - - checksums_dir = layoutmanager.get_instance().get_checksums_dir() - checksum_path = os.path.join(checksums_dir, checksum) - checksum_entry_path = os.path.join(checksum_path, uid) - - if os.path.exists(checksum_entry_path): - logging.debug('remove %r', checksum_entry_path) - os.remove(checksum_entry_path) - - if os.path.exists(checksum_path): - try: - os.rmdir(checksum_path) - logging.debug('removed %r', checksum_path) - except OSError, e: - if e.errno != errno.ENOTEMPTY: - raise - - def _identical_file_already_exists(self, checksum): - """Check if we already have files with this checksum. - - """ - checksums_dir = layoutmanager.get_instance().get_checksums_dir() - checksum_path = os.path.join(checksums_dir, checksum) - return os.path.exists(checksum_path) - - def _get_uid_from_checksum(self, checksum): - """Get an existing entry which file matches checksum. - - """ - checksums_dir = layoutmanager.get_instance().get_checksums_dir() - checksum_path = os.path.join(checksums_dir, checksum) - first_uid = os.listdir(checksum_path)[0] - return first_uid - - def _create_checksum_dir(self, checksum): - """Create directory that tracks files with this same checksum. - - """ - checksums_dir = layoutmanager.get_instance().get_checksums_dir() - checksum_path = os.path.join(checksums_dir, checksum) - logging.debug('create dir %r', checksum_path) - os.mkdir(checksum_path) - - def _add_checksum_entry(self, uid, checksum): - """Create a file in the checksum dir with the uid of the entry - - """ - checksums_dir = layoutmanager.get_instance().get_checksums_dir() - checksum_path = os.path.join(checksums_dir, checksum) - - logging.debug('touch %r', os.path.join(checksum_path, uid)) - open(os.path.join(checksum_path, uid), 'w').close() - - def _already_linked(self, uid, checksum): - """Check if this entry's file is already a hard link to the checksums - dir. - - """ - checksums_dir = layoutmanager.get_instance().get_checksums_dir() - checksum_path = os.path.join(checksums_dir, checksum) - return os.path.exists(os.path.join(checksum_path, uid)) - - def _process_entry_cb(self): - """Process one item in the checksums queue by calculating its checksum, - checking if there exist already an identical file, and in that case - substituting its file with a hard link to that pre-existing file. - + logging.debug('schedule optimize') + gobject.idle_add(self._deferred_optimize_cb, + priority=gobject.PRIORITY_LOW) + + def _deferred_optimize_cb(self): + """Optimize trying to avoid running costly md5sum. + We query the stat of data files to find + file pairs of the same size, and we check they + are not already linked reading the inode. + We keep a flag indicating last run, we look for + files newer than the flag, to avoid re-hashing + old file-pairs that don't match. + This seems like a lot of work, but it's done + on dirents which are in the kernel cache, so + it is virtually free. + The big win comes from never hashing large files. + In the rare case where two large files are exactly + the same size, there's good cause to hash them. + FIXME: fork to a bg process. """ - queue_path = layoutmanager.get_instance().get_queue_path() - queue = os.listdir(queue_path) - if queue: - uid = queue[0] - logging.debug('_process_entry_cb processing %r', uid) - - file_in_entry_path = self._file_store.get_file_path(uid) - if not os.path.exists(file_in_entry_path): - logging.info('non-existent entry in queue: %r', uid) - else: - checksum = self._calculate_md5sum(file_in_entry_path) - self._metadata_store.set_property(uid, 'checksum', checksum) - - if self._identical_file_already_exists(checksum): - if not self._already_linked(uid, checksum): - existing_entry_uid = \ - self._get_uid_from_checksum(checksum) - - self._file_store.hard_link_entry(uid, - existing_entry_uid) - - self._add_checksum_entry(uid, checksum) - else: - self._create_checksum_dir(checksum) - self._add_checksum_entry(uid, checksum) - - os.remove(os.path.join(queue_path, uid)) - - if len(queue) <= 1: - self._enqueue_checksum_id = None - return False - else: - return True + logging.debug('_deferred_optimize_cb') + last_optimized = 0 + if os.path.exists(self._optimized_flag_path): + last_optimized = os.stat(self._optimized_flag_path).st_mtime + # touch it now, to win races + f = open(self._optimized_flag_path, 'w') + f.write('.') + f.close() + + datafiles = subprocess.check_output(['find', self._root_path, + '-type' , 'f', + '-name', 'data', + '-mindepth', '3', + '-maxdepth', '3', + '-print0']) + by_size = {} + recent = [] + for fpath in datafiles.split('\0'): + if not fpath: + continue + s = os.stat(fpath) + dfile = [fpath, s] + if s.st_mtime > last_optimized: + recent.append(dfile) + if not by_size.has_key(s.st_size): + by_size[s.st_size] = [] + by_size[s.st_size].append(dfile) + + sz_seen = [] + for dfile in recent: + # only worth md5summing files same size + # only evaluate each given size once + if dfile[1].st_size in sz_seen: + continue + sz_seen.append(dfile[1].st_size) + + candidates = by_size[dfile[1].st_size] + if len(candidates) < 2: + # only file this size + continue + + # avoid csum of already hardlinked files + by_inode = {} + for cdate in candidates: + inode = cdate[1].st_ino + if not by_inode.has_key(inode): + by_inode[inode] = [] + by_inode[inode].append(cdate) + if len(by_inode.keys()) == 1: + # hardlinked already + continue + + by_csum = {} + # cluster by csum. + # only csum once per inode + for inode in by_inode.keys(): + # use fpath from the first entry + fpath = by_inode[inode][0][0] + logging.debug('optimize: md5sum(%s)' % fpath) + csum = self._calculate_md5sum(fpath) + logging.debug('optimize: md5sum(%s) done' % fpath) + if not by_csum.has_key(csum): + by_csum[csum] = [] + by_csum[csum].append(inode) + + for csum in by_csum.keys(): + if len(by_csum[csum]) == 1: + continue + fpath_grp = [] + for inode in by_csum[csum]: + for cdate in by_inode[inode]: + fpath_grp.append(cdate[0]) + target_path=fpath_grp.pop() + for fpath in fpath_grp: + logging.debug('hardlinking target %s from %s' % (target_path, fpath)) + temp_path = fpath+'.tmp' + if os.path.exists(temp_path): + os.remove(temp_path) + os.link(target_path, temp_path) + os.rename(temp_path, fpath) + + # indicates to the idle loop caller we're done + return False def _calculate_md5sum(self, path): """Calculate the md5 checksum of a given file. -- 1.7.10.4 _______________________________________________ Sugar-devel mailing list Sugar-devel@lists.sugarlabs.org http://lists.sugarlabs.org/listinfo/sugar-devel