opts.async = (key, limit):
async urlgrab() with conn limiting.
parallel_wait():
wait untill all grabs have finished.
---
urlgrabber/grabber.py | 169 +++++++++++++++++++++++++++++++++++++++++++++++++
urlgrabber/mirror.py | 4 +
2 files changed, 173 insertions(+), 0 deletions(-)
diff --git a/urlgrabber/grabber.py b/urlgrabber/grabber.py
index a7c847b..644b431 100644
--- a/urlgrabber/grabber.py
+++ b/urlgrabber/grabber.py
@@ -259,6 +259,12 @@ GENERAL ARGUMENTS (kwargs)
What type of name to IP resolving to use, default is to do both IPV4 and
IPV6.
+ async = (key, limit)
+
+ When this option is set, the urlgrab() is not processed immediately
+ but queued. parallel_wait() then processes grabs in parallel, limiting
+ the numer of connections in each 'key' group to at most 'limit'.
+
RETRY RELATED ARGUMENTS
@@ -882,6 +888,7 @@ class URLGrabberOptions:
self.size = None # if we know how big the thing we're getting is going
# to be. this is ultimately a MAXIMUM size for the
file
self.max_header_size = 2097152 #2mb seems reasonable for maximum
header size
+ self.async = None # blocking by default
def __repr__(self):
return self.format()
@@ -1019,6 +1026,15 @@ class URLGrabber(object):
_callback(opts.checkfunc, obj)
return path
+ if opts.async:
+ opts.url = url
+ opts.filename = filename
+ opts.size = int(opts.size or 0)
+ key, limit = opts.async
+ limit, queue = _async.setdefault(key, [limit, []])
+ queue.append(opts)
+ return filename
+
def retryfunc(opts, url, filename):
fo = PyCurlFileObject(url, filename, opts)
try:
@@ -1833,6 +1849,159 @@ def retrygrab(url, filename=None, copy_local=0,
close_connection=0,
#####################################################################
+# Downloader
+#####################################################################
+
+class _AsyncCurlFile(PyCurlFileObject):
+ def _do_open(self):
+ self.curl_obj = pycurl.Curl() # don't reuse _curl_cache
+ self._set_opts()
+ self._do_open_fo() # open the file but don't grab
+
+
+#####################################################################
+# High level async API
+#####################################################################
+
+_async = {}
+
+def parallel_wait(meter = 'text'):
+ '''Process queued requests in parallel.
+ '''
+
+ if meter:
+ count = total = 0
+ for limit, queue in _async.values():
+ for opts in queue:
+ count += 1
+ total += opts.size
+ if meter == 'text':
+ from progress import TextMultiFileMeter
+ meter = TextMultiFileMeter()
+ meter.start(count, total)
+
+ running = {}
+ multi = pycurl.CurlMulti()
+
+ def start(opts, tries):
+ opts.tries = tries
+ opts.progress_obj = meter and meter.newMeter()
+ if DEBUG: DEBUG.info('attempt %i/%s: %s', opts.tries, opts.retry,
opts.url)
+ fo = _AsyncCurlFile(opts.url, opts.filename, opts)
+ running[fo.curl_obj] = fo
+ multi.add_handle(fo.curl_obj)
+
+ def start_next(opts):
+ key, limit = opts.async
+ pos, queue = _async[key]; _async[key][0] += 1
+ if pos < len(queue):
+ start(queue[pos], 1)
+
+ def perform():
+ while multi.perform()[0] == pycurl.E_CALL_MULTI_PERFORM:
+ pass
+ q, finished, failed = multi.info_read()
+ for curl in finished + failed:
+
+ # curl layer
+ curl_err = None
+ if type(curl) == tuple:
+ curl, code, msg = curl
+ curl_err = pycurl.error(code, msg)
+ multi.remove_handle(curl)
+
+ # grabber layer
+ fo = running.pop(curl); opts = fo.opts
+ try: ug_err = None; fo._do_perform_exc(curl_err)
+ except URLGrabError, ug_err: pass
+ fo._do_close_fo()
+
+ # do progress before callbacks to show retries
+ if meter:
+ m = opts.progress_obj
+ m.basename = os.path.basename(opts.filename)
+ if ug_err:
+ m.failure(ug_err.args[1])
+ else:
+ # file size might have changed
+ meter.re.total += fo._amount_read - opts.size
+ m.end(fo._amount_read)
+ meter.removeMeter(m)
+
+ if ug_err is None:
+ if DEBUG: DEBUG.info('success')
+ if opts.checkfunc:
+ try: _callback(opts.checkfunc, opts)
+ except URLGrabError, ug_err:
+ if meter:
+ meter.numfiles += 1
+ meter.re.total += opts.size
+ if ug_err is None:
+ start_next(opts)
+ continue
+
+ if DEBUG: DEBUG.info('failure: %s', ug_err)
+ retry = opts.retry or 0
+ if opts.failure_callback:
+ opts.exception = ug_err
+ try: _callback(opts.failure_callback, opts)
+ except URLGrabError, ug_err: retry = 0 # no retries
+ if opts.tries < retry and ug_err.args[0] in opts.retrycodes:
+ start(opts, opts.tries + 1) # simple retry
+ continue
+ start_next(opts)
+
+ if hasattr(opts, 'mirror_group'):
+ mg, gr, mirrorchoice = opts.mirror_group
+ opts.exception = ug_err
+ opts.mirror = mirrorchoice['mirror']
+ opts.relative_url = gr.url
+ try:
+ mg._failure(gr, opts)
+ mirrorchoice = mg._get_mirror(gr)
+ opts.mirror_group = mg, gr, mirrorchoice
+ except URLGrabError, ug_err: pass
+ else:
+ # use new mirrorchoice
+ key = mirrorchoice['mirror']
+ limit = mirrorchoice.get('max_connections') or 3
+ opts.async = key, limit
+ opts.url = mg._join_url(mirrorchoice['mirror'], gr.url)
+
+ # add request to the new queue
+ pos, queue = _async.setdefault(key, [limit, []])
+ queue[pos:pos] = [opts] # inserting at head
+ if len(queue) <= pos:
+ start(opts, 1)
+ continue
+
+ # urlgrab failed
+ if not opts.failfunc:
+ raise ug_err
+ opts.exception = ug_err
+ _callback(opts.failfunc, opts)
+
+ try:
+ for limit, queue in _async.values():
+ for opts in queue[:limit]:
+ start(opts, 1)
+ # now 'limit' is used as 'pos', index
+ # of the first request not started yet.
+ while running:
+ perform()
+
+ finally:
+ while running:
+ curl, fo = running.popitem()
+ multi.remove_handle(curl)
+ fo._do_close_fo()
+ os.unlink(fo.opts.filename)
+ _async.clear()
+ if meter:
+ meter.end()
+
+
+#####################################################################
# TESTING
def _main_test():
try: url, filename = sys.argv[1:3]
diff --git a/urlgrabber/mirror.py b/urlgrabber/mirror.py
index 5f0120f..47b6f89 100644
--- a/urlgrabber/mirror.py
+++ b/urlgrabber/mirror.py
@@ -396,6 +396,10 @@ class MirrorGroup:
# - blocking urlgrab() ignores failfunc
# - async urlgrab() can iterate mirrors
kwargs['mirror_group'] = self, gr, mirrorchoice
+ if kw.get('async'):
+ key = mirrorchoice['mirror']
+ limit = mirrorchoice.get('max_connections') or 3
+ kwargs['async'] = key, limit
try:
return func_ref( *(fullurl,), **kwargs )
except URLGrabError, e:
--
1.7.4.4
_______________________________________________
Yum-devel mailing list
[email protected]
http://lists.baseurl.org/mailman/listinfo/yum-devel