Hello,

I've been working on a small Twisted program.
The program makes HTTP requests to a large number of feeds.
Twisted is used to speed up the entire process.
After the feeds are fetched, they're parsed. Finally they should be
written to a database (to simplify the code, that part is left out).

Feeds are fetched in parallel using gatherResults, and a batch is
built. Then all batches are again gathered into a set of batches,
a DeferredList is built out of those. A semaphore controls both the
batch-level list of deferreds, and a semaphore controls the entire batch
list deferred.

Currently, the program works ok on 100-150 feeds, and BATCH_SIZE between
5 and 20.

However, I notice the program starts to hang for a long time, when the
number of feeds goes over 150-200.

To be more precise, at the end of running the program, messages
like these are printed, but the program seems to not be very active:

    Stopping factory <twisted.web.client._HTTP11ClientFactory instance at 
0x7f0b7d5f3908>

It seems like this is the cleanup phase.

I've read what I could find on the topic. I wasn't able to make progress
on it, so I'm posting to the mailing list to ask if someone has encountered this
before. Maybe it's a common pitfall or issue that other people have also
bumped into.

Thanks
http://mauveweb.co.uk/rss.xml "python"
http://blog.hownowstephen.com/rss "python"
http://blog.codepainters.com/feed/ "python"
http://chase-seibert.github.io/blog/atom.xml "python"
http://www.lshift.net/blog/feed/ "python"
http://django-planet.com/feeds/main/rss/ "python"
http://eflorenzano.com/atom.xml "python"

#!/usr/bin/python
## Uses a DeferredList to asynchronously fetch a number of pages
## Note: has problems if too much pages are being fetched async
import sys,re,json,feedparser
from time import strftime
from datetime import datetime
# sys.path.append("rawdog")
from   twisted.internet import reactor
from   twisted.internet.task import deferLater
from   twisted.python import log
from   twisted.web.http_headers import Headers
from   twisted.python.util import println
import twisted.web.client as client
from   twisted.internet.ssl import ClientContextFactory
from   twisted.internet.defer import setDebugging, Deferred, DeferredList, DeferredSemaphore, gatherResults
import psycopg2

# http://txpostgres.readthedocs.org/

global DEBUG_MODE
global agent

DEBUG_MODE=True
setDebugging(DEBUG_MODE)

TIMEOUT_HTTP_CLIENT=3.0
REDIRECT_LIMIT=3
CUSTOM_USER_AGENT='Mozilla/5.0 (X11; Linux x86_64) AppleWebKit/537.36 (KHTML, like Gecko) Ubuntu Chromium/49.0.2623.108 Chrome/49.0.2623.108 Safari/537.36'
pool = client.HTTPConnectionPool(reactor)
# add HTTPS policy, set timeout
baseAgent = client.Agent(reactor, contextFactory=client.BrowserLikePolicyForHTTPS(), connectTimeout=TIMEOUT_HTTP_CLIENT, pool=pool)
# follow redirects (with max limit)
agent = client.BrowserLikeRedirectAgent(baseAgent, redirectLimit=REDIRECT_LIMIT)
# support compression
agent = client.ContentDecoderAgent(agent, [(b'gzip', client.GzipDecoder)])

def getPage(url):
    d = agent.request(
            'GET',
            url,
            Headers({
                'User-Agent': [CUSTOM_USER_AGENT],
                # 'Accept-Encoding': ['gzip, deflate'],
                }),
            None,
            )
    d.addCallback(lambda out: out).addCallback(lambda resp: client.readBody(resp))
    d.addErrback(lambda err: err)
    return d

def set_up_client():
    # set up client with:
    #
    # set Twisted debug level
    # connect to the db
    DEBUG_MODE = True
    setDebugging(DEBUG_MODE)
    log.startLogging(sys.stdout)
    # db connection and deferred

def clean_up_and_exit(*args, **kwargs):
    # print "clean_up_and_exit"
    # print args

    log.msg('clean_up_and_exit')
    reactor.stop()

def parse_date(date1, date2):
    # edge-case for feed
    # pubdate is assumed to be a time.struct_time here
    pubdate_fmt = None

    pubdate = date1
    if pubdate:
        try:
            pubdate_fmt = strftime('%Y-%m-%d %H:%M:%S +0000',pubdate)
        except Exception, e:
            print e
            pubdate = None

    # edge-case for atom feed
    # e.get('updated') 2011-02-01T20:21:42+00:00
    pubdate = date2
    if pubdate and not pubdate_fmt:
        try:
            i1 = pubdate[:19]
            i1 = datetime.strptime(i1, "%Y-%m-%dT%H:%M:%S")
            pubdate_fmt = i1.strftime('%Y-%m-%d %H:%M:%S +0000')
        except Exception, e:
            print e
            pubdate = None

    return pubdate_fmt

# parse feeds
def store_fetched_data(data):
    # log.msg(data)
    # reactor.callLater(0, lambda a,b: None, 0, d)
    if data:
        query_data = []
        log.msg('store_fetched_data')
        for f in data:
            if 'response' in f and f['response'] is not None:
                try:
                    r = f['response']
                    # log.msg('len response: ' + str(len(r)))
                    feed = feedparser.parse(r)
                    if 'entries' in feed:
                        log.msg('entries: ' + str(len(feed['entries'])))
                        for e in feed['entries']:
                            try:
                                title = e.get('title','')
                                summary = e.get('summary','')

                                date1 = e.get('published_parsed',None)
                                date2 = e.get('updated', None)

                                pubdate_fmt = parse_date(date1, date2)

                                if not pubdate_fmt:
                                    continue

                                # query_data.append((e['title'],e['summary'],pubdate_fmt,))
                                # log.msg('cnt:' + str(cnt))
                                query_data.append({
                                    'title': title,
                                    'summary': summary,
                                    'pubdate': pubdate_fmt,
                                    })
                            except Exception, e:
                                print e
                                pass
                except Exception, e:
                    print e
                    pass
            else:
                print 'feed with empty response'

        len_query_data = len(query_data)
        if len_query_data == 0:
            pass
        else:
            pass
    else:
        # the request was empty
        print 'store_fetched_data received None'
        pass

def fetch_single(feed_meta=None):

    def fetched_callback(r):
        feed_complete_data = feed_meta
        feed_complete_data['response'] = r
        return feed_complete_data

    print "scheduling new request ", feed_meta
    d = getPage(feed_meta['feed_url'])
    d.addCallback(fetched_callback)
    d.addErrback(fetched_callback)
    return d


def batch_gen(data, batch_size):
    for i in range(0, len(data), batch_size):
        yield data[i:i+batch_size]

def fetch_all(feeds):
    BATCH_SIZE=5
    batches = []
    for feeds_batch in batch_gen(feeds, BATCH_SIZE):
        sem = DeferredSemaphore(len(feeds_batch))
        batch = []
        for feed_ in feeds_batch:
            batch.append(sem.run(fetch_single, feed_meta=feed_))
        batchDef = gatherResults(batch, consumeErrors=False)
        batchDef.addCallback(store_fetched_data)
        batches.append(batchDef)

    # rendez-vous for all feeds that were fetched
    batchesDef = gatherResults(batches, consumeErrors=False)

    batchesDef.addCallbacks(
            clean_up_and_exit,
            errback=lambda x: None,
            )
    return batchesDef

def parse_feed_config_line(line):
    parts = re.split(r'\s+', line)
    url = parts[0]
    tags = map(lambda t: re.sub('[\'"]+','', t), parts[1:])
    return {
            'feed_url': url,
            'tags': tags,
            }

def parse_config_data(path=None):
    feeds = []

    with open('./urls','r') as f:
        entrylim = 99999999
        # entrylim = 3
        entrynum = 1
        for line in f:
            if re.match(r'^http', line):
                line = line.rstrip()
                o = parse_feed_config_line(line)
                feeds.append(o)
                entrynum += 1
                if entrynum > entrylim:
                    break

    return feeds

set_up_client()
feeds_metadata = parse_config_data('urls')
d = fetch_all(feeds_metadata)
reactor.run()

_______________________________________________
Twisted-Python mailing list
Twisted-Python@twistedmatrix.com
http://twistedmatrix.com/cgi-bin/mailman/listinfo/twisted-python

Reply via email to