This is very similar to what we do in YumBase.downloadPkgs(). If urlgrabber supports parallel downloading and we have compatible Yum and repo._getFile(), just add async and failfunc arguments.
As _getFile(async=True) returns *before* the file is downloaded, most of the post-processing was being moved to checkfunc. Error handling is duplicated in failfunc, because we can delay a callback, but not raising an exception. To make this work, recent builds if yum and urlgrabber are needed http://lists.fedoraproject.org/pipermail/devel/2012-May/167122.html but we are still compatible with the old Yum, too. --- yum-presto/presto.py | 71 ++++++++++++++++++++++--------------------------- 1 files changed, 32 insertions(+), 39 deletions(-) diff --git a/yum-presto/presto.py b/yum-presto/presto.py index 0461c8e..cac5cdd 100644 --- a/yum-presto/presto.py +++ b/yum-presto/presto.py @@ -358,64 +358,57 @@ def downloadPkgs(conduit, presto, download_pkgs=None): # now we need to do downloads i = 0 - local_size = 0 + _size = [0, 0] # local_size, rebuild_size + async = hasattr(urlgrabber.grabber, 'parallel_wait') for (po, delta) in remote_pkgs: i += 1 - # FIXME: verifyChecksum should handle the urlgrabber objects... - checkfunc = (lambda fo, csumtype, csum: - conduit._base.verifyChecksum(fo.filename, csumtype, csum), - (delta['checksum_type'], - delta['checksum']), {}) - - deltadir = os.path.join(po.repo.cachedir, 'deltas') - deltapath = os.path.join(deltadir, - os.path.basename(delta['filename'])) - - # FIXME: this should be moved into _getFile - dirstat = os.statvfs(deltadir) - delta_size = delta['size'] - if (dirstat.f_bavail * dirstat.f_bsize) <= (long(po.size) + delta_size): - adderror(po, _('Insufficient space in download directory %s ' - 'to download') % (deltadir,)) - continue + text = os.path.basename(delta['filename']) + deltapath = os.path.join(po.repo.cachedir, 'deltas', text) + + def checkfunc(obj, d=delta, po=po, dp=deltapath): + # FIXME: verifyChecksum should handle the urlgrabber objects... + conduit._base.verifyChecksum(obj.filename, d['checksum_type'], d['checksum']) + _size[0] += d['size'] + _size[1] += po.size + if hasattr(urlgrabber.progress, 'text_meter_total_size'): + urlgrabber.progress.text_meter_total_size(remote_size, _size[0]) + + # Magic: What happens here is that the checksum data is loaded + # from the SQL db in this thread, so that the other thread can call + # .verifyLocalPkg() without having to hit sqlite. + po.returnChecksums() + queue.put((conduit, po, dp, d['size'])) + if po in errors: + del errors[po] - if hasattr(urlgrabber.progress, 'text_meter_total_size'): - urlgrabber.progress.text_meter_total_size(remote_size, local_size) + kwargs = {} + if async and getattr(po.repo, '_async', None): + kwargs['failfunc'] = lambda obj, po=po: adderror(po, str(obj.exception)) + kwargs['async'] = True + elif not (i == 1 and not _size[0] and remote_size == delta['size']): + text = '(%s/%s): %s' % (i, len(remote_pkgs), text) try: - if i == 1 and not local_size and remote_size == delta_size: - text = os.path.basename(delta['filename']) - else: - text = "(%s/%s): %s" % (i, len(remote_pkgs), - os.path.basename(delta['filename'])) deltafile = po.repo._getFile(url=po.basepath, relative=delta['filename'], local=deltapath, checkfunc=checkfunc, text=text, - cache=po.repo.cache) - local_size += delta_size - if hasattr(urlgrabber.progress, 'text_meter_total_size'): - urlgrabber.progress.text_meter_total_size(remote_size, - local_size) + size=delta['size'], + cache=po.repo.cache, + **kwargs) except yum.Errors.RepoError, e: adderror(po, str(e)) else: # HACK: Use the download progress, at least we get something cb = po.repo.callback - # Magic: What happens here is that the checksum data is loaded - # from the SQL db in this thread, so that the other thread can call - # .verifyLocalPkg() without having to hit sqlite. - po.returnChecksums() - queue.put((conduit, po, deltafile, delta['size'])) - rebuild_size += po.size - - if errors.has_key(po): - del errors[po] # Check for waiting messages from building threads while not messages.empty(): conduit.info(2, messages.get()) + if async: + urlgrabber.grabber.parallel_wait() + rebuild_size += _size[1] if hasattr(urlgrabber.progress, 'text_meter_total_size'): urlgrabber.progress.text_meter_total_size(0) ptsz, pfsz, pfnsz = _processing_data() -- 1.7.4.4 _______________________________________________ Yum-devel mailing list [email protected] http://lists.baseurl.org/mailman/listinfo/yum-devel
