Add a wrapper around the code that touches curl.
(preparing for external downloader)
---
urlgrabber/grabber.py | 80 +++++++++++++++++++++++++++++-------------------
1 files changed, 48 insertions(+), 32 deletions(-)
diff --git a/urlgrabber/grabber.py b/urlgrabber/grabber.py
index 644b431..b64c943 100644
--- a/urlgrabber/grabber.py
+++ b/urlgrabber/grabber.py
@@ -1858,6 +1858,47 @@ class _AsyncCurlFile(PyCurlFileObject):
self._set_opts()
self._do_open_fo() # open the file but don't grab
+class _DirectDownloader:
+ def __init__(self):
+ ''' A downloader context.
+ '''
+ self.running = {}
+ self.multi = pycurl.CurlMulti()
+
+ def start(self, opts):
+ ''' Start download of job 'opts'
+ '''
+ fo = _AsyncCurlFile(opts.url, opts.filename, opts)
+ self.running[fo.curl_obj] = fo
+ self.multi.add_handle(fo.curl_obj)
+
+ def perform(self):
+ ''' Run downloads, return finished ones.
+ '''
+ while self.multi.perform()[0] == pycurl.E_CALL_MULTI_PERFORM:
+ pass
+ ret = []
+ _, finished, failed = self.multi.info_read()
+ for curl in finished + failed:
+ curl_err = None
+ if type(curl) == tuple:
+ curl, code, msg = curl
+ curl_err = pycurl.error(code, msg)
+ self.multi.remove_handle(curl)
+ fo = self.running.pop(curl)
+ try: ug_err = None; fo._do_perform_exc(curl_err)
+ except URLGrabError, ug_err: pass
+ fo._do_close_fo()
+ ret.append((fo.opts, ug_err, fo._amount_read))
+ return ret
+
+ def abort(self):
+ while self.running:
+ curl, fo = self.running.popitem()
+ self.multi.remove_handle(curl)
+ fo._do_close_fo()
+ os.unlink(fo.opts.filename)
+
#####################################################################
# High level async API
@@ -1880,16 +1921,13 @@ def parallel_wait(meter = 'text'):
meter = TextMultiFileMeter()
meter.start(count, total)
- running = {}
- multi = pycurl.CurlMulti()
+ dl = _DirectDownloader()
def start(opts, tries):
opts.tries = tries
opts.progress_obj = meter and meter.newMeter()
if DEBUG: DEBUG.info('attempt %i/%s: %s', opts.tries, opts.retry,
opts.url)
- fo = _AsyncCurlFile(opts.url, opts.filename, opts)
- running[fo.curl_obj] = fo
- multi.add_handle(fo.curl_obj)
+ dl.start(opts)
def start_next(opts):
key, limit = opts.async
@@ -1898,25 +1936,7 @@ def parallel_wait(meter = 'text'):
start(queue[pos], 1)
def perform():
- while multi.perform()[0] == pycurl.E_CALL_MULTI_PERFORM:
- pass
- q, finished, failed = multi.info_read()
- for curl in finished + failed:
-
- # curl layer
- curl_err = None
- if type(curl) == tuple:
- curl, code, msg = curl
- curl_err = pycurl.error(code, msg)
- multi.remove_handle(curl)
-
- # grabber layer
- fo = running.pop(curl); opts = fo.opts
- try: ug_err = None; fo._do_perform_exc(curl_err)
- except URLGrabError, ug_err: pass
- fo._do_close_fo()
-
- # do progress before callbacks to show retries
+ for opts, ug_err, _amount_read in dl.perform():
if meter:
m = opts.progress_obj
m.basename = os.path.basename(opts.filename)
@@ -1924,8 +1944,8 @@ def parallel_wait(meter = 'text'):
m.failure(ug_err.args[1])
else:
# file size might have changed
- meter.re.total += fo._amount_read - opts.size
- m.end(fo._amount_read)
+ meter.re.total += _amount_read - opts.size
+ m.end(_amount_read)
meter.removeMeter(m)
if ug_err is None:
@@ -1987,15 +2007,11 @@ def parallel_wait(meter = 'text'):
start(opts, 1)
# now 'limit' is used as 'pos', index
# of the first request not started yet.
- while running:
+ while dl.running:
perform()
finally:
- while running:
- curl, fo = running.popitem()
- multi.remove_handle(curl)
- fo._do_close_fo()
- os.unlink(fo.opts.filename)
+ dl.abort()
_async.clear()
if meter:
meter.end()
--
1.7.4.4
_______________________________________________
Yum-devel mailing list
[email protected]
http://lists.baseurl.org/mailman/listinfo/yum-devel