A simple interface to external downloading processes, with
limited support for transparent retries and mirrorlist cycling.
---
yum/yumRepo.py | 79 ++++++++++++++++++++++++++++++++++++++++++++++++++++++++
1 files changed, 79 insertions(+), 0 deletions(-)
diff --git a/yum/yumRepo.py b/yum/yumRepo.py
index e5e9ece..e1ec8e8 100644
--- a/yum/yumRepo.py
+++ b/yum/yumRepo.py
@@ -48,6 +48,7 @@ import shutil
import stat
import errno
import tempfile
+import select, fcntl
# If you want yum to _always_ check the MD .sqlite files then set this to
# False (this doesn't affect .xml files or .sqilte files derived from them).
@@ -285,6 +286,7 @@ class YumRepository(Repository, config.RepoConf):
self._grabfunc = None
self._grab = None
+ self._down_jobs = {}
def __cmp__(self, other):
""" Sort yum repos. by cost, and then by alphanumeric on their id. """
@@ -866,6 +868,83 @@ class YumRepository(Repository, config.RepoConf):
size=package.size,
)
+ # repositories with pending downloads
+ _down_repos = set()
+
+ def getIdleProcess(self):
+ # find an idle download process for this repository
+ while 1:
+ rfd = select.select(self._down_jobs, [], [])[0][0]
+ wfd, po, checkfunc, i = self._down_jobs[rfd]
+
+ # check error level
+ error = int(os.read(rfd, 4096))
+ if error == 0 or i == self.retries:
+ break
+
+ # move to other mirror
+ i += 1
+ os.write(wfd, self.urls[i % len(self.urls)] + po.relativepath +
'\n')
+ self._down_jobs[rfd] = wfd, po, checkfunc, i
+
+ # call 'checkfunc'
+ checkfunc(po, error)
+ return rfd, wfd
+
+ def getPackageAsync(self, po, checkfunc, text=None, parallel=2):
+ ''' Start downloading package 'po'.
+
+ Sends the download request to an idle download process,
+ or create a new such process. Callback 'checkfunc' is
+ called when download finished.
+ '''
+
+ # use repo-specific value if provided
+ parallel = max(1, self.parallel or parallel)
+ if len(self._down_jobs) < parallel:
+ # new downloading process
+ A = os.pipe()
+ B = os.pipe()
+ if os.fork() == 0:
+ # child: stdin from B, stdout to A
+ os.close(B[1]); os.dup2(B[0], 0)
+ os.close(A[0]); os.dup2(A[1], 1)
+ os.chdir(self.pkgdir)
+ # make a config option for this?
+ os.execl('/usr/local/bin/yum-down.sh', '')
+
+ # parent: read A, write B
+ os.close(A[1]); rfd = A[0]
+ os.close(B[0]); wfd = B[1]
+
+ # we fork/exec later, and keeping 'wfd' in childs
+ # would avoid seeing an EOF when we close it.
+ f = fcntl.fcntl(wfd, fcntl.F_GETFD)
+ fcntl.fcntl(wfd, fcntl.F_SETFD, f | fcntl.FD_CLOEXEC)
+ else:
+ rfd, wfd = self.getIdleProcess()
+
+ # Send the request to selected process
+ os.write(wfd, self.urls[0] + po.relativepath + '\n')
+ self._down_jobs[rfd] = wfd, po, checkfunc, 0
+ self._down_repos.add(self)
+
+ @classmethod
+ def getPackageDone(self):
+ ''' Wait for all repos to finish downloading.
+ '''
+
+ for self in self._down_repos:
+ while self._down_jobs:
+ # remove one process from dict
+ rfd, wfd = self.getIdleProcess()
+ del self._down_jobs[rfd]
+ # make it exit and wait
+ os.close(rfd)
+ os.close(wfd)
+ os.wait()
+ self._down_repos.clear()
+
def getHeader(self, package, checkfunc = None, reget = 'simple',
cache = True):
--
1.7.4.4
_______________________________________________
Yum-devel mailing list
[email protected]
http://lists.baseurl.org/mailman/listinfo/yum-devel