Test via. yum up --downloadonly --setopt=deltarpm=1
---
yum/__init__.py | 4 +++
yum/drpm.py | 58 ++++++++++++++++++++++++++++++++++++++++++++++++++----
2 files changed, 57 insertions(+), 5 deletions(-)
diff --git a/yum/__init__.py b/yum/__init__.py
index 4a491a6..54f7bb1 100644
--- a/yum/__init__.py
+++ b/yum/__init__.py
@@ -2304,6 +2304,9 @@ much more problems).
if isinstance(po, DeltaPackage):
presto.rebuild(po, adderror)
return
+ else:
+ presto.dequeue(block=False)
+
if po.repoid not in done_repos:
done_repos.add(po.repoid)
# Check a single package per. repo. ... to give a
hint to
@@ -2333,6 +2336,7 @@ much more problems).
adderror(po, exception2msg(e))
if async:
urlgrabber.grabber.parallel_wait()
+ presto.dequeue_all()
presto.wait()
if hasattr(urlgrabber.progress, 'text_meter_total_size'):
diff --git a/yum/drpm.py b/yum/drpm.py
index 8008618..ae1a516 100644
--- a/yum/drpm.py
+++ b/yum/drpm.py
@@ -88,6 +88,7 @@ class DeltaInfo:
def __init__(self, ayum, pkgs):
self.verbose_logger = ayum.verbose_logger
self.jobs = {}
+ self._future_jobs = []
self.limit = ayum.conf.deltarpm
if self.limit < 0:
nprocs = _num_cpus_online()
@@ -188,15 +189,33 @@ class DeltaInfo:
pkgs[index] = DeltaPackage(po, size, remote, csum,
oldrpm)
el.clear()
- def wait(self, limit = 1):
+ def wait(self, num=None):
+ if num is None:
+ num = len(self.jobs)
+
# wait for some jobs, run callbacks
- while len(self.jobs) >= limit:
- pid, code = os.wait()
+ while num > 0:
+ assert self.jobs
+ num -= self._wait(block=True)
+
+ def _wait(self, block=False):
+ num = 0
+
+ while self.jobs:
+ if block:
+ pid, code = os.wait()
+ else:
+ pid, code = os.waitpid(-1, os.WNOHANG)
+ if not pid:
+ break
+
# urlgrabber spawns child jobs, too. But they exit synchronously,
# so we should never see an unknown pid here.
assert pid in self.jobs
callback = self.jobs.pop(pid)
callback(code)
+ num += 1
+ return num
def rebuild(self, po, adderror):
# this runs when worker finishes
@@ -210,10 +229,39 @@ class DeltaInfo:
os.unlink(po.localpath)
po.localpath = po.rpm.localpath # for --downloadonly
- # spawn a worker process
- self.wait(self.limit)
args = ()
if po.oldrpm: args += '-r', po.oldrpm
args += po.localpath, po.rpm.localpath
+
+ self.queue(args, callback)
+ self.dequeue(block=False)
+
+ def queue(self, args, callback):
+ """ Queue a delta rebuild up. """
+ self._future_jobs.append((args, callback))
+
+ def dequeue_all(self):
+ """ De-Queue all delta rebuilds and spawn the rebuild processes. """
+
+ while self._future_jobs:
+ self.dequeue()
+
+ def dequeue(self, block=True):
+ """ Try to De-Queue a delta rebuild and spawn the rebuild process. """
+ # Do this here, just to keep the zombies at bay...
+ self._wait()
+
+ if not self._future_jobs:
+ return
+
+ if self.limit <= len(self.jobs):
+ if not block:
+ return
+ self.wait((self.limit - len(self.jobs)) + 1)
+
+ args, callback = self._future_jobs.pop(0)
+ self._spawn(args, callback)
+
+ def _spawn(self, args, callback):
pid = os.spawnl(os.P_NOWAIT, APPLYDELTA, APPLYDELTA, *args)
self.jobs[pid] = callback
--
1.7.7.6
_______________________________________________
Yum-devel mailing list
[email protected]
http://lists.baseurl.org/mailman/listinfo/yum-devel