Hello, I've been working on a small Twisted program. The program makes HTTP requests to a large number of feeds. Twisted is used to speed up the entire process. After the feeds are fetched, they're parsed. Finally they should be written to a database (to simplify the code, that part is left out).
Feeds are fetched in parallel using gatherResults, and a batch is built. Then all batches are again gathered into a set of batches, a DeferredList is built out of those. A semaphore controls both the batch-level list of deferreds, and a semaphore controls the entire batch list deferred. Currently, the program works ok on 100-150 feeds, and BATCH_SIZE between 5 and 20. However, I notice the program starts to hang for a long time, when the number of feeds goes over 150-200. To be more precise, at the end of running the program, messages like these are printed, but the program seems to not be very active: Stopping factory <twisted.web.client._HTTP11ClientFactory instance at 0x7f0b7d5f3908> It seems like this is the cleanup phase. I've read what I could find on the topic. I wasn't able to make progress on it, so I'm posting to the mailing list to ask if someone has encountered this before. Maybe it's a common pitfall or issue that other people have also bumped into. Thanks
http://mauveweb.co.uk/rss.xml "python" http://blog.hownowstephen.com/rss "python" http://blog.codepainters.com/feed/ "python" http://chase-seibert.github.io/blog/atom.xml "python" http://www.lshift.net/blog/feed/ "python" http://django-planet.com/feeds/main/rss/ "python" http://eflorenzano.com/atom.xml "python"
#!/usr/bin/python ## Uses a DeferredList to asynchronously fetch a number of pages ## Note: has problems if too much pages are being fetched async import sys,re,json,feedparser from time import strftime from datetime import datetime # sys.path.append("rawdog") from twisted.internet import reactor from twisted.internet.task import deferLater from twisted.python import log from twisted.web.http_headers import Headers from twisted.python.util import println import twisted.web.client as client from twisted.internet.ssl import ClientContextFactory from twisted.internet.defer import setDebugging, Deferred, DeferredList, DeferredSemaphore, gatherResults import psycopg2 # http://txpostgres.readthedocs.org/ global DEBUG_MODE global agent DEBUG_MODE=True setDebugging(DEBUG_MODE) TIMEOUT_HTTP_CLIENT=3.0 REDIRECT_LIMIT=3 CUSTOM_USER_AGENT='Mozilla/5.0 (X11; Linux x86_64) AppleWebKit/537.36 (KHTML, like Gecko) Ubuntu Chromium/49.0.2623.108 Chrome/49.0.2623.108 Safari/537.36' pool = client.HTTPConnectionPool(reactor) # add HTTPS policy, set timeout baseAgent = client.Agent(reactor, contextFactory=client.BrowserLikePolicyForHTTPS(), connectTimeout=TIMEOUT_HTTP_CLIENT, pool=pool) # follow redirects (with max limit) agent = client.BrowserLikeRedirectAgent(baseAgent, redirectLimit=REDIRECT_LIMIT) # support compression agent = client.ContentDecoderAgent(agent, [(b'gzip', client.GzipDecoder)]) def getPage(url): d = agent.request( 'GET', url, Headers({ 'User-Agent': [CUSTOM_USER_AGENT], # 'Accept-Encoding': ['gzip, deflate'], }), None, ) d.addCallback(lambda out: out).addCallback(lambda resp: client.readBody(resp)) d.addErrback(lambda err: err) return d def set_up_client(): # set up client with: # # set Twisted debug level # connect to the db DEBUG_MODE = True setDebugging(DEBUG_MODE) log.startLogging(sys.stdout) # db connection and deferred def clean_up_and_exit(*args, **kwargs): # print "clean_up_and_exit" # print args log.msg('clean_up_and_exit') reactor.stop() def parse_date(date1, date2): # edge-case for feed # pubdate is assumed to be a time.struct_time here pubdate_fmt = None pubdate = date1 if pubdate: try: pubdate_fmt = strftime('%Y-%m-%d %H:%M:%S +0000',pubdate) except Exception, e: print e pubdate = None # edge-case for atom feed # e.get('updated') 2011-02-01T20:21:42+00:00 pubdate = date2 if pubdate and not pubdate_fmt: try: i1 = pubdate[:19] i1 = datetime.strptime(i1, "%Y-%m-%dT%H:%M:%S") pubdate_fmt = i1.strftime('%Y-%m-%d %H:%M:%S +0000') except Exception, e: print e pubdate = None return pubdate_fmt # parse feeds def store_fetched_data(data): # log.msg(data) # reactor.callLater(0, lambda a,b: None, 0, d) if data: query_data = [] log.msg('store_fetched_data') for f in data: if 'response' in f and f['response'] is not None: try: r = f['response'] # log.msg('len response: ' + str(len(r))) feed = feedparser.parse(r) if 'entries' in feed: log.msg('entries: ' + str(len(feed['entries']))) for e in feed['entries']: try: title = e.get('title','') summary = e.get('summary','') date1 = e.get('published_parsed',None) date2 = e.get('updated', None) pubdate_fmt = parse_date(date1, date2) if not pubdate_fmt: continue # query_data.append((e['title'],e['summary'],pubdate_fmt,)) # log.msg('cnt:' + str(cnt)) query_data.append({ 'title': title, 'summary': summary, 'pubdate': pubdate_fmt, }) except Exception, e: print e pass except Exception, e: print e pass else: print 'feed with empty response' len_query_data = len(query_data) if len_query_data == 0: pass else: pass else: # the request was empty print 'store_fetched_data received None' pass def fetch_single(feed_meta=None): def fetched_callback(r): feed_complete_data = feed_meta feed_complete_data['response'] = r return feed_complete_data print "scheduling new request ", feed_meta d = getPage(feed_meta['feed_url']) d.addCallback(fetched_callback) d.addErrback(fetched_callback) return d def batch_gen(data, batch_size): for i in range(0, len(data), batch_size): yield data[i:i+batch_size] def fetch_all(feeds): BATCH_SIZE=5 batches = [] for feeds_batch in batch_gen(feeds, BATCH_SIZE): sem = DeferredSemaphore(len(feeds_batch)) batch = [] for feed_ in feeds_batch: batch.append(sem.run(fetch_single, feed_meta=feed_)) batchDef = gatherResults(batch, consumeErrors=False) batchDef.addCallback(store_fetched_data) batches.append(batchDef) # rendez-vous for all feeds that were fetched batchesDef = gatherResults(batches, consumeErrors=False) batchesDef.addCallbacks( clean_up_and_exit, errback=lambda x: None, ) return batchesDef def parse_feed_config_line(line): parts = re.split(r'\s+', line) url = parts[0] tags = map(lambda t: re.sub('[\'"]+','', t), parts[1:]) return { 'feed_url': url, 'tags': tags, } def parse_config_data(path=None): feeds = [] with open('./urls','r') as f: entrylim = 99999999 # entrylim = 3 entrynum = 1 for line in f: if re.match(r'^http', line): line = line.rstrip() o = parse_feed_config_line(line) feeds.append(o) entrynum += 1 if entrynum > entrylim: break return feeds set_up_client() feeds_metadata = parse_config_data('urls') d = fetch_all(feeds_metadata) reactor.run()
_______________________________________________ Twisted-Python mailing list Twisted-Python@twistedmatrix.com http://twistedmatrix.com/cgi-bin/mailman/listinfo/twisted-python