Author: ab Date: Tue Dec 1 15:15:00 2009 New Revision: 885785 URL: http://svn.apache.org/viewvc?rev=885785&view=rev Log: NUTCH-769 Fetcher to skip queues for URLS getting repeated exceptions.
Modified: lucene/nutch/trunk/CHANGES.txt lucene/nutch/trunk/conf/nutch-default.xml lucene/nutch/trunk/src/java/org/apache/nutch/fetcher/Fetcher.java Modified: lucene/nutch/trunk/CHANGES.txt URL: http://svn.apache.org/viewvc/lucene/nutch/trunk/CHANGES.txt?rev=885785&r1=885784&r2=885785&view=diff ============================================================================== --- lucene/nutch/trunk/CHANGES.txt (original) +++ lucene/nutch/trunk/CHANGES.txt Tue Dec 1 15:15:00 2009 @@ -2,6 +2,9 @@ Unreleased Changes +* NUTCH-769 Fetcher to skip queues for URLS getting repeated exceptions + (Julien Nioche via ab) + * NUTCH-768 - Upgrade Nutch 1.0 to use Hadoop 0.20.1, also upgrades Xerces to version 2.9.1. (kubes) Modified: lucene/nutch/trunk/conf/nutch-default.xml URL: http://svn.apache.org/viewvc/lucene/nutch/trunk/conf/nutch-default.xml?rev=885785&r1=885784&r2=885785&view=diff ============================================================================== --- lucene/nutch/trunk/conf/nutch-default.xml (original) +++ lucene/nutch/trunk/conf/nutch-default.xml Tue Dec 1 15:15:00 2009 @@ -610,6 +610,16 @@ </description> </property> +<property> + <name>fetcher.max.exceptions.per.queue</name> + <value>-1</value> + <description>The maximum number of protocol-level exceptions (e.g. timeouts) per + host (or IP) queue. Once this value is reached, any remaining entries from this + queue are purged, effectively stopping the fetching from this host/IP. The default + value of -1 deactivates this limit. + </description> +</property> + <!-- indexer properties --> <property> Modified: lucene/nutch/trunk/src/java/org/apache/nutch/fetcher/Fetcher.java URL: http://svn.apache.org/viewvc/lucene/nutch/trunk/src/java/org/apache/nutch/fetcher/Fetcher.java?rev=885785&r1=885784&r2=885785&view=diff ============================================================================== --- lucene/nutch/trunk/src/java/org/apache/nutch/fetcher/Fetcher.java (original) +++ lucene/nutch/trunk/src/java/org/apache/nutch/fetcher/Fetcher.java Tue Dec 1 15:15:00 2009 @@ -208,6 +208,7 @@ List<FetchItem> queue = Collections.synchronizedList(new LinkedList<FetchItem>()); Set<FetchItem> inProgress = Collections.synchronizedSet(new HashSet<FetchItem>()); AtomicLong nextFetchTime = new AtomicLong(); + AtomicInteger exceptionCounter = new AtomicInteger(); long crawlDelay; long minCrawlDelay; int maxThreads; @@ -236,6 +237,10 @@ return inProgress.size(); } + public int incrementExceptionCounter() { + return exceptionCounter.incrementAndGet(); + } + public void finishFetchItem(FetchItem it, boolean asap) { if (it != null) { inProgress.remove(it); @@ -306,6 +311,7 @@ long crawlDelay; long minCrawlDelay; long timelimit = -1; + int maxExceptionsPerQueue = -1; Configuration conf; public FetchItemQueues(Configuration conf) { @@ -316,6 +322,7 @@ this.crawlDelay = (long) (conf.getFloat("fetcher.server.delay", 1.0f) * 1000); this.minCrawlDelay = (long) (conf.getFloat("fetcher.server.min.delay", 0.0f) * 1000); this.timelimit = conf.getLong("fetcher.timelimit.mins", -1); + this.maxExceptionsPerQueue = conf.getInt("fetcher.max.exceptions.per.queue", -1); } public int getTotalSize() { @@ -402,6 +409,36 @@ return count; } + /** + * Increment the exception counter of a queue in case of an exception e.g. + * timeout; when higher than a given threshold simply empty the queue. + * + * @param queueid + * @return number of purged items + */ + public synchronized int checkExceptionThreshold(String queueid) { + FetchItemQueue fiq = queues.get(queueid); + if (fiq == null) { + return 0; + } + if (fiq.getQueueSize() == 0) { + return 0; + } + int excCount = fiq.incrementExceptionCounter(); + if (maxExceptionsPerQueue!= -1 && excCount >= maxExceptionsPerQueue) { + // too many exceptions for items in this queue - purge it + int deleted = fiq.emptyQueue(); + LOG.info("* queue: " + queueid + " >> removed " + deleted + + " URLs from queue because " + excCount + " exceptions occurred"); + for (int i = 0; i < deleted; i++) { + totalSize.decrementAndGet(); + } + return deleted; + } + return 0; + } + + public synchronized void dump() { for (String id : queues.keySet()) { FetchItemQueue fiq = queues.get(id); @@ -673,6 +710,8 @@ case ProtocolStatus.EXCEPTION: logError(fit.url, status.getMessage()); + int killedURLs = fetchQueues.checkExceptionThreshold(fit.getQueueID()); + reporter.incrCounter("FetcherStatus", "Exceptions", killedURLs); /* FALLTHROUGH */ case ProtocolStatus.RETRY: // retry case ProtocolStatus.BLOCKED: