default_grabber.opts.parallel:
concurrent connections limit.
0 = always use blocking urlgrab()
parallel_begin():
start queueing grab requests
parallel_end():
process queue in parallel
---
urlgrabber/grabber.py | 171 +++++++++++++++++++++++++++++++++++++++++++++++++
1 files changed, 171 insertions(+), 0 deletions(-)
diff --git a/urlgrabber/grabber.py b/urlgrabber/grabber.py
index 303428c..f0f4c6b 100644
--- a/urlgrabber/grabber.py
+++ b/urlgrabber/grabber.py
@@ -872,6 +872,7 @@ class URLGrabberOptions:
self.size = None # if we know how big the thing we're getting is going
# to be. this is ultimately a MAXIMUM size for the
file
self.max_header_size = 2097152 #2mb seems reasonable for maximum
header size
+ self.parallel = 5 # max connections for parallel grabs
def __repr__(self):
return self.format()
@@ -1008,6 +1009,13 @@ class URLGrabber(object):
apply(cb_func, (obj, )+cb_args, cb_kwargs)
return path
+ if _async_on:
+ opts.url = url
+ opts.filename = filename
+ opts.size = int(opts.size or 0)
+ _async_list.append(opts)
+ return filename
+
def retryfunc(opts, url, filename):
fo = PyCurlFileObject(url, filename, opts)
try:
@@ -1827,6 +1835,169 @@ def retrygrab(url, filename=None, copy_local=0,
close_connection=0,
#####################################################################
+# Helpers
+#####################################################################
+
+class _AsyncCurlFile(PyCurlFileObject):
+ def _do_open(self):
+ self.curl_obj = pycurl.Curl() # don't reuse _curl_cache
+ self._set_opts()
+ self._do_open_fo() # open the file but don't grab
+
+def _callback(cb, obj):
+ if callable(cb):
+ return cb(obj)
+ cb, arg, karg = cb
+ return cb(obj, *arg, **karg)
+
+#####################################################################
+# High level async API
+#####################################################################
+
+_async_on = False
+_async_list = []
+
+def parallel_begin():
+ '''Start queuing urlgrab() requests.
+ '''
+
+ if default_grabber.opts.parallel == 0:
+ return
+
+ global _async_on
+ assert _async_on == False
+ _async_on = True
+
+def parallel_end(meter = 'text'):
+ '''Process queued requests in parallel.
+ '''
+
+ if default_grabber.opts.parallel == 0:
+ return
+
+ global _async_on
+ assert _async_on == True
+ _async_on = False
+
+ global _async_list
+ if not _async_list: return
+ todo = _async_list; _async_list = []
+ limit = default_grabber.opts.parallel
+
+ if meter:
+ total = 0
+ for opts in todo:
+ total += opts.size
+ if meter == 'text':
+ from progress import TextMultiFileMeter
+ meter = TextMultiFileMeter()
+ meter.start(len(todo), total)
+
+ running = {}
+ multi = pycurl.CurlMulti()
+
+ def start(opts, tries):
+ opts.tries = tries
+ if DEBUG: DEBUG.info('attempt %i/%s: %s', opts.tries, opts.retry,
opts.url)
+ opts.progress_obj = meter and meter.newMeter()
+ fo = _AsyncCurlFile(opts.url, opts.filename, opts)
+ running[fo.curl_obj] = fo
+ multi.add_handle(fo.curl_obj)
+
+ def perform():
+ while multi.perform()[0] == pycurl.E_CALL_MULTI_PERFORM:
+ pass
+ q, finished, failed = multi.info_read()
+ for curl in finished + failed:
+
+ # curl layer
+ curl_err = None
+ if type(curl) == tuple:
+ curl, code, msg = curl
+ curl_err = pycurl.error(code, msg)
+ multi.remove_handle(curl)
+
+ # grabber layer
+ fo = running.pop(curl); opts = fo.opts
+ try: ug_err = None; fo._do_perform_exc(curl_err)
+ except URLGrabError, ug_err: pass
+ fo._do_close_fo()
+
+ # do progress before callbacks to show retries
+ if meter:
+ m = opts.progress_obj
+ if ug_err:
+ # progress_obj might not have start()ed yet
+ m.basename = fo._prog_basename
+ m.failure(ug_err.args[1])
+ else:
+ # file size might have changed
+ meter.re.total += fo._amount_read - opts.size
+ m.end(fo._amount_read)
+ meter.removeMeter(m)
+
+ if ug_err is None:
+ if DEBUG: DEBUG.info('success')
+ if opts.checkfunc:
+ try: _callback(opts.checkfunc, opts)
+ except URLGrabError, ug_err:
+ if meter:
+ meter.numfiles += 1
+ meter.re.total += opts.size
+ if ug_err is None:
+ # download & checkfunc: OK
+ continue
+
+ if DEBUG: DEBUG.info('failure: %s', ug_err)
+ retry = opts.retry or 0
+ if opts.failure_callback:
+ opts.exception = ug_err
+ try: _callback(opts.failure_callback, opts)
+ except URLGrabError, ug_err: retry = 0 # no retries
+ if opts.tries < retry and ug_err.args[0] in opts.retrycodes:
+ # simple retry
+ start(opts, opts.tries + 1)
+ continue
+
+ if hasattr(opts, 'mirror_group'):
+ mg, gr, mirrorchoice = opts.mirror_group
+ opts.exception = ug_err
+ opts.mirror = mirrorchoice['mirror']
+ opts.relative_url = gr.url
+ try:
+ mg._failure(gr, opts)
+ mirrorchoice = mg._get_mirror(gr)
+ opts.url = mg._join_url(mirrorchoice['mirror'], gr.url)
+ opts.mirror_group = mg, gr, mirrorchoice
+ # retry next mirror
+ start(opts, 1)
+ continue
+ except URLGrabError, ug_err: pass
+
+ # urlgrab failed
+ if not hasattr(opts, 'failfunc'): raise ug_err
+ opts.exception = ug_err
+ _callback(opts.failfunc, opts)
+
+ try:
+ for opts in todo:
+ start(opts, 1)
+ while len(running) >= limit:
+ perform()
+ while running:
+ perform()
+
+ finally:
+ while running:
+ curl, fo = running.popitem()
+ multi.remove_handle(curl)
+ fo._do_close_fo()
+ os.unlink(fo.opts.filename)
+
+ if meter:
+ meter.end()
+
+#####################################################################
# TESTING
def _main_test():
try: url, filename = sys.argv[1:3]
--
1.7.4.4
_______________________________________________
Yum-devel mailing list
[email protected]
http://lists.baseurl.org/mailman/listinfo/yum-devel