Modified: nutch/trunk/src/java/org/apache/nutch/fetcher/Fetcher.java URL: http://svn.apache.org/viewvc/nutch/trunk/src/java/org/apache/nutch/fetcher/Fetcher.java?rev=1655526&r1=1655525&r2=1655526&view=diff ============================================================================== --- nutch/trunk/src/java/org/apache/nutch/fetcher/Fetcher.java (original) +++ nutch/trunk/src/java/org/apache/nutch/fetcher/Fetcher.java Thu Jan 29 05:38:59 2015 @@ -55,37 +55,40 @@ import crawlercommons.robots.BaseRobotRu /** * A queue-based fetcher. - * - * <p>This fetcher uses a well-known model of one producer (a QueueFeeder) - * and many consumers (FetcherThread-s). - * - * <p>QueueFeeder reads input fetchlists and - * populates a set of FetchItemQueue-s, which hold FetchItem-s that - * describe the items to be fetched. There are as many queues as there are unique - * hosts, but at any given time the total number of fetch items in all queues - * is less than a fixed number (currently set to a multiple of the number of - * threads). - * - * <p>As items are consumed from the queues, the QueueFeeder continues to add new + * + * <p> + * This fetcher uses a well-known model of one producer (a QueueFeeder) and many + * consumers (FetcherThread-s). + * + * <p> + * QueueFeeder reads input fetchlists and populates a set of FetchItemQueue-s, + * which hold FetchItem-s that describe the items to be fetched. There are as + * many queues as there are unique hosts, but at any given time the total number + * of fetch items in all queues is less than a fixed number (currently set to a + * multiple of the number of threads). + * + * <p> + * As items are consumed from the queues, the QueueFeeder continues to add new * input items, so that their total count stays fixed (FetcherThread-s may also * add new items to the queues e.g. as a results of redirection) - until all * input items are exhausted, at which point the number of items in the queues * begins to decrease. When this number reaches 0 fetcher will finish. - * - * <p>This fetcher implementation handles per-host blocking itself, instead - * of delegating this work to protocol-specific plugins. - * Each per-host queue handles its own "politeness" settings, such as the - * maximum number of concurrent requests and crawl delay between consecutive - * requests - and also a list of requests in progress, and the time the last - * request was finished. As FetcherThread-s ask for new items to be fetched, - * queues may return eligible items or null if for "politeness" reasons this - * host's queue is not yet ready. - * - * <p>If there are still unfetched items in the queues, but none of the items - * are ready, FetcherThread-s will spin-wait until either some items become + * + * <p> + * This fetcher implementation handles per-host blocking itself, instead of + * delegating this work to protocol-specific plugins. Each per-host queue + * handles its own "politeness" settings, such as the maximum number of + * concurrent requests and crawl delay between consecutive requests - and also a + * list of requests in progress, and the time the last request was finished. As + * FetcherThread-s ask for new items to be fetched, queues may return eligible + * items or null if for "politeness" reasons this host's queue is not yet ready. + * + * <p> + * If there are still unfetched items in the queues, but none of the items are + * ready, FetcherThread-s will spin-wait until either some items become * available, or a timeout is reached (at which point the Fetcher will abort, * assuming the task is hung). - * + * * @author Andrzej Bialecki */ public class Fetcher extends Configured implements Tool, @@ -99,16 +102,16 @@ public class Fetcher extends Configured public static final Logger LOG = LoggerFactory.getLogger(Fetcher.class); - public static class InputFormat extends SequenceFileInputFormat<Text, CrawlDatum> { + public static class InputFormat extends + SequenceFileInputFormat<Text, CrawlDatum> { /** Don't split inputs, to keep things polite. */ - public InputSplit[] getSplits(JobConf job, int nSplits) - throws IOException { + public InputSplit[] getSplits(JobConf job, int nSplits) throws IOException { FileStatus[] files = listStatus(job); FileSplit[] splits = new FileSplit[files.length]; for (int i = 0; i < files.length; i++) { FileStatus cur = files[i]; - splits[i] = new FileSplit(cur.getPath(), 0, - cur.getLen(), (String[])null); + splits[i] = new FileSplit(cur.getPath(), 0, cur.getLen(), + (String[]) null); } return splits; } @@ -124,8 +127,8 @@ public class Fetcher extends Configured private long start = System.currentTimeMillis(); // start time of fetcher run private AtomicLong lastRequestStart = new AtomicLong(start); - private AtomicLong bytes = new AtomicLong(0); // total bytes fetched - private AtomicInteger pages = new AtomicInteger(0); // total pages fetched + private AtomicLong bytes = new AtomicLong(0); // total bytes fetched + private AtomicInteger pages = new AtomicInteger(0); // total pages fetched private AtomicInteger errors = new AtomicInteger(0); // total pages errored private boolean storingContent; @@ -149,7 +152,8 @@ public class Fetcher extends Configured this(url, u, datum, queueID, 0); } - public FetchItem(Text url, URL u, CrawlDatum datum, String queueID, int outlinkDepth) { + public FetchItem(Text url, URL u, CrawlDatum datum, String queueID, + int outlinkDepth) { this.url = url; this.u = u; this.datum = datum; @@ -157,15 +161,17 @@ public class Fetcher extends Configured this.outlinkDepth = outlinkDepth; } - /** Create an item. Queue id will be created based on <code>queueMode</code> - * argument, either as a protocol + hostname pair, protocol + IP - * address pair or protocol+domain pair. + /** + * Create an item. Queue id will be created based on <code>queueMode</code> + * argument, either as a protocol + hostname pair, protocol + IP address + * pair or protocol+domain pair. */ - public static FetchItem create(Text url, CrawlDatum datum, String queueMode) { + public static FetchItem create(Text url, CrawlDatum datum, String queueMode) { return create(url, datum, queueMode, 0); } - public static FetchItem create(Text url, CrawlDatum datum, String queueMode, int outlinkDepth) { + public static FetchItem create(Text url, CrawlDatum datum, + String queueMode, int outlinkDepth) { String queueID; URL u = null; try { @@ -185,19 +191,18 @@ public class Fetcher extends Configured LOG.warn("Unable to resolve: " + u.getHost() + ", skipping."); return null; } - } - else if (FetchItemQueues.QUEUE_MODE_DOMAIN.equalsIgnoreCase(queueMode)){ + } else if (FetchItemQueues.QUEUE_MODE_DOMAIN.equalsIgnoreCase(queueMode)) { key = URLUtil.getDomainName(u); if (key == null) { - LOG.warn("Unknown domain for url: " + url + ", using URL string as key"); - key=u.toExternalForm(); + LOG.warn("Unknown domain for url: " + url + + ", using URL string as key"); + key = u.toExternalForm(); } - } - else { + } else { key = u.getHost(); if (key == null) { LOG.warn("Unknown host for url: " + url + ", using URL string as key"); - key=u.toExternalForm(); + key = u.toExternalForm(); } } queueID = proto + "://" + key.toLowerCase(); @@ -222,13 +227,14 @@ public class Fetcher extends Configured } /** - * This class handles FetchItems which come from the same host ID (be it - * a proto/hostname or proto/IP pair). It also keeps track of requests in + * This class handles FetchItems which come from the same host ID (be it a + * proto/hostname or proto/IP pair). It also keeps track of requests in * progress and elapsed time between requests. */ private static class FetchItemQueue { - List<FetchItem> queue = Collections.synchronizedList(new LinkedList<FetchItem>()); - AtomicInteger inProgress = new AtomicInteger(); + List<FetchItem> queue = Collections + .synchronizedList(new LinkedList<FetchItem>()); + AtomicInteger inProgress = new AtomicInteger(); AtomicLong nextFetchTime = new AtomicLong(); AtomicInteger exceptionCounter = new AtomicInteger(); long crawlDelay; @@ -236,7 +242,8 @@ public class Fetcher extends Configured int maxThreads; Configuration conf; - public FetchItemQueue(Configuration conf, int maxThreads, long crawlDelay, long minCrawlDelay) { + public FetchItemQueue(Configuration conf, int maxThreads, long crawlDelay, + long minCrawlDelay) { this.conf = conf; this.maxThreads = maxThreads; this.crawlDelay = crawlDelay; @@ -271,26 +278,33 @@ public class Fetcher extends Configured } public void addFetchItem(FetchItem it) { - if (it == null) return; + if (it == null) + return; queue.add(it); } public void addInProgressFetchItem(FetchItem it) { - if (it == null) return; + if (it == null) + return; inProgress.incrementAndGet(); } public FetchItem getFetchItem() { - if (inProgress.get() >= maxThreads) return null; + if (inProgress.get() >= maxThreads) + return null; long now = System.currentTimeMillis(); - if (nextFetchTime.get() > now) return null; + if (nextFetchTime.get() > now) + return null; FetchItem it = null; - if (queue.size() == 0) return null; + if (queue.size() == 0) + return null; try { it = queue.remove(0); inProgress.incrementAndGet(); } catch (Exception e) { - LOG.error("Cannot remove FetchItem from queue or cannot add it to inProgress queue", e); + LOG.error( + "Cannot remove FetchItem from queue or cannot add it to inProgress queue", + e); } return it; } @@ -314,7 +328,8 @@ public class Fetcher extends Configured private void setEndTime(long endTime, boolean asap) { if (!asap) - nextFetchTime.set(endTime + (maxThreads > 1 ? minCrawlDelay : crawlDelay)); + nextFetchTime.set(endTime + + (maxThreads > 1 ? minCrawlDelay : crawlDelay)); else nextFetchTime.set(endTime); } @@ -346,17 +361,21 @@ public class Fetcher extends Configured this.maxThreads = conf.getInt("fetcher.threads.per.queue", 1); queueMode = conf.get("fetcher.queue.mode", QUEUE_MODE_HOST); // check that the mode is known - if (!queueMode.equals(QUEUE_MODE_IP) && !queueMode.equals(QUEUE_MODE_DOMAIN) + if (!queueMode.equals(QUEUE_MODE_IP) + && !queueMode.equals(QUEUE_MODE_DOMAIN) && !queueMode.equals(QUEUE_MODE_HOST)) { - LOG.error("Unknown partition mode : " + queueMode + " - forcing to byHost"); + LOG.error("Unknown partition mode : " + queueMode + + " - forcing to byHost"); queueMode = QUEUE_MODE_HOST; } - LOG.info("Using queue mode : "+queueMode); + LOG.info("Using queue mode : " + queueMode); this.crawlDelay = (long) (conf.getFloat("fetcher.server.delay", 1.0f) * 1000); - this.minCrawlDelay = (long) (conf.getFloat("fetcher.server.min.delay", 0.0f) * 1000); + this.minCrawlDelay = (long) (conf.getFloat("fetcher.server.min.delay", + 0.0f) * 1000); this.timelimit = conf.getLong("fetcher.timelimit", -1); - this.maxExceptionsPerQueue = conf.getInt("fetcher.max.exceptions.per.queue", -1); + this.maxExceptionsPerQueue = conf.getInt( + "fetcher.max.exceptions.per.queue", -1); } public int getTotalSize() { @@ -369,7 +388,8 @@ public class Fetcher extends Configured public void addFetchItem(Text url, CrawlDatum datum) { FetchItem it = FetchItem.create(url, datum, queueMode); - if (it != null) addFetchItem(it); + if (it != null) + addFetchItem(it); } public synchronized void addFetchItem(FetchItem it) { @@ -402,8 +422,8 @@ public class Fetcher extends Configured } public synchronized FetchItem getFetchItem() { - Iterator<Map.Entry<String, FetchItemQueue>> it = - queues.entrySet().iterator(); + Iterator<Map.Entry<String, FetchItemQueue>> it = queues.entrySet() + .iterator(); while (it.hasNext()) { FetchItemQueue fiq = it.next().getValue(); // reap empty queues @@ -431,7 +451,8 @@ public class Fetcher extends Configured // there might also be a case where totalsize !=0 but number of queues // == 0 // in which case we simply force it to 0 to avoid blocking - if (totalSize.get() != 0 && queues.size() == 0) totalSize.set(0); + if (totalSize.get() != 0 && queues.size() == 0) + totalSize.set(0); } return count; } @@ -442,7 +463,8 @@ public class Fetcher extends Configured for (String id : queues.keySet()) { FetchItemQueue fiq = queues.get(id); - if (fiq.getQueueSize() == 0) continue; + if (fiq.getQueueSize() == 0) + continue; LOG.info("* queue: " + id + " >> dropping! "); int deleted = fiq.emptyQueue(); for (int i = 0; i < deleted; i++) { @@ -457,7 +479,7 @@ public class Fetcher extends Configured /** * Increment the exception counter of a queue in case of an exception e.g. * timeout; when higher than a given threshold simply empty the queue. - * + * * @param queueid * @return number of purged items */ @@ -470,7 +492,7 @@ public class Fetcher extends Configured return 0; } int excCount = fiq.incrementExceptionCounter(); - if (maxExceptionsPerQueue!= -1 && excCount >= maxExceptionsPerQueue) { + if (maxExceptionsPerQueue != -1 && excCount >= maxExceptionsPerQueue) { // too many exceptions for items in this queue - purge it int deleted = fiq.emptyQueue(); LOG.info("* queue: " + queueid + " >> removed " + deleted @@ -483,11 +505,11 @@ public class Fetcher extends Configured return 0; } - public synchronized void dump() { for (String id : queues.keySet()) { FetchItemQueue fiq = queues.get(id); - if (fiq.getQueueSize() == 0) continue; + if (fiq.getQueueSize() == 0) + continue; LOG.info("* queue: " + id); fiq.dump(); } @@ -495,8 +517,8 @@ public class Fetcher extends Configured } /** - * This class feeds the queues with input items, and re-fills them as - * items are consumed by FetcherThread-s. + * This class feeds the queues with input items, and re-fills them as items + * are consumed by FetcherThread-s. */ private static class QueueFeeder extends Thread { private RecordReader<Text, CrawlDatum> reader; @@ -541,7 +563,9 @@ public class Fetcher extends Configured // queues are full - spin-wait until they have some free space try { Thread.sleep(1000); - } catch (Exception e) {}; + } catch (Exception e) { + } + ; continue; } else { LOG.debug("-feeding " + feed + " input urls ..."); @@ -562,8 +586,8 @@ public class Fetcher extends Configured } } } - LOG.info("QueueFeeder finished: total " + cnt + " records + hit by time limit :" - + timelimitcount); + LOG.info("QueueFeeder finished: total " + cnt + + " records + hit by time limit :" + timelimitcount); } } @@ -595,12 +619,12 @@ public class Fetcher extends Configured private int outlinksDepthDivisor; private boolean skipTruncated; - + private boolean halted = false; public FetcherThread(Configuration conf) { - this.setDaemon(true); // don't hang JVM on exit - this.setName("FetcherThread"); // use an informative name + this.setDaemon(true); // don't hang JVM on exit + this.setName("FetcherThread"); // use an informative name this.conf = conf; this.urlFilters = new URLFilters(conf); this.scfilters = new ScoringFilters(conf); @@ -609,26 +633,33 @@ public class Fetcher extends Configured this.protocolFactory = new ProtocolFactory(conf); this.normalizers = new URLNormalizers(conf, URLNormalizers.SCOPE_FETCHER); this.maxCrawlDelay = conf.getInt("fetcher.max.crawl.delay", 30) * 1000; - queueMode = conf.get("fetcher.queue.mode", FetchItemQueues.QUEUE_MODE_HOST); + queueMode = conf.get("fetcher.queue.mode", + FetchItemQueues.QUEUE_MODE_HOST); // check that the mode is known - if (!queueMode.equals(FetchItemQueues.QUEUE_MODE_IP) && !queueMode.equals(FetchItemQueues.QUEUE_MODE_DOMAIN) + if (!queueMode.equals(FetchItemQueues.QUEUE_MODE_IP) + && !queueMode.equals(FetchItemQueues.QUEUE_MODE_DOMAIN) && !queueMode.equals(FetchItemQueues.QUEUE_MODE_HOST)) { - LOG.error("Unknown partition mode : " + queueMode + " - forcing to byHost"); + LOG.error("Unknown partition mode : " + queueMode + + " - forcing to byHost"); queueMode = FetchItemQueues.QUEUE_MODE_HOST; } - LOG.info("Using queue mode : "+queueMode); + LOG.info("Using queue mode : " + queueMode); this.maxRedirect = conf.getInt("http.redirect.max", 3); - this.ignoreExternalLinks = - conf.getBoolean("db.ignore.external.links", false); + this.ignoreExternalLinks = conf.getBoolean("db.ignore.external.links", + false); maxOutlinksPerPage = conf.getInt("db.max.outlinks.per.page", 100); - maxOutlinks = (maxOutlinksPerPage < 0) ? Integer.MAX_VALUE : maxOutlinksPerPage; + maxOutlinks = (maxOutlinksPerPage < 0) ? Integer.MAX_VALUE + : maxOutlinksPerPage; interval = conf.getInt("db.fetch.interval.default", 2592000); ignoreExternalLinks = conf.getBoolean("db.ignore.external.links", false); maxOutlinkDepth = conf.getInt("fetcher.follow.outlinks.depth", -1); - outlinksIgnoreExternal = conf.getBoolean("fetcher.follow.outlinks.ignore.external", false); - maxOutlinkDepthNumLinks = conf.getInt("fetcher.follow.outlinks.num.links", 4); - outlinksDepthDivisor = conf.getInt("fetcher.follow.outlinks.depth.divisor", 2); + outlinksIgnoreExternal = conf.getBoolean( + "fetcher.follow.outlinks.ignore.external", false); + maxOutlinkDepthNumLinks = conf.getInt( + "fetcher.follow.outlinks.num.links", 4); + outlinksDepthDivisor = conf.getInt( + "fetcher.follow.outlinks.depth.divisor", 2); } @SuppressWarnings("fallthrough") @@ -645,7 +676,7 @@ public class Fetcher extends Configured fit = null; return; } - + fit = fetchQueues.getFetchItem(); if (fit == null) { if (feeder.isAlive() || fetchQueues.getTotalSize() > 0) { @@ -654,8 +685,9 @@ public class Fetcher extends Configured spinWaiting.incrementAndGet(); try { Thread.sleep(500); - } catch (Exception e) {} - spinWaiting.decrementAndGet(); + } catch (Exception e) { + } + spinWaiting.decrementAndGet(); continue; } else { // all done, finish this thread @@ -664,8 +696,8 @@ public class Fetcher extends Configured } } lastRequestStart.set(System.currentTimeMillis()); - Text reprUrlWritable = - (Text) fit.datum.getMetaData().get(Nutch.WRITABLE_REPR_URL_KEY); + Text reprUrlWritable = (Text) fit.datum.getMetaData().get( + Nutch.WRITABLE_REPR_URL_KEY); if (reprUrlWritable == null) { reprUrl = fit.url.toString(); } else { @@ -677,14 +709,16 @@ public class Fetcher extends Configured redirectCount = 0; do { if (LOG.isInfoEnabled()) { - LOG.info("fetching " + fit.url + " (queue crawl delay=" + - fetchQueues.getFetchItemQueue(fit.queueID).crawlDelay + "ms)"); + LOG.info("fetching " + fit.url + " (queue crawl delay=" + + fetchQueues.getFetchItemQueue(fit.queueID).crawlDelay + + "ms)"); } if (LOG.isDebugEnabled()) { LOG.debug("redirectCount=" + redirectCount); } redirecting = false; - Protocol protocol = this.protocolFactory.getProtocol(fit.url.toString()); + Protocol protocol = this.protocolFactory.getProtocol(fit.url + .toString()); BaseRobotRules rules = protocol.getRobotRules(fit.url, fit.datum); if (!rules.isAllowed(fit.u.toString())) { // unblock @@ -692,7 +726,9 @@ public class Fetcher extends Configured if (LOG.isDebugEnabled()) { LOG.debug("Denied by robots.txt: " + fit.url); } - output(fit.url, fit.datum, null, ProtocolStatus.STATUS_ROBOTS_DENIED, CrawlDatum.STATUS_FETCH_GONE); + output(fit.url, fit.datum, null, + ProtocolStatus.STATUS_ROBOTS_DENIED, + CrawlDatum.STATUS_FETCH_GONE); reporter.incrCounter("FetcherStatus", "robots_denied", 1); continue; } @@ -700,19 +736,27 @@ public class Fetcher extends Configured if (rules.getCrawlDelay() > maxCrawlDelay && maxCrawlDelay >= 0) { // unblock fetchQueues.finishFetchItem(fit, true); - LOG.debug("Crawl-Delay for " + fit.url + " too long (" + rules.getCrawlDelay() + "), skipping"); - output(fit.url, fit.datum, null, ProtocolStatus.STATUS_ROBOTS_DENIED, CrawlDatum.STATUS_FETCH_GONE); - reporter.incrCounter("FetcherStatus", "robots_denied_maxcrawldelay", 1); + LOG.debug("Crawl-Delay for " + fit.url + " too long (" + + rules.getCrawlDelay() + "), skipping"); + output(fit.url, fit.datum, null, + ProtocolStatus.STATUS_ROBOTS_DENIED, + CrawlDatum.STATUS_FETCH_GONE); + reporter.incrCounter("FetcherStatus", + "robots_denied_maxcrawldelay", 1); continue; } else { - FetchItemQueue fiq = fetchQueues.getFetchItemQueue(fit.queueID); + FetchItemQueue fiq = fetchQueues + .getFetchItemQueue(fit.queueID); fiq.crawlDelay = rules.getCrawlDelay(); if (LOG.isDebugEnabled()) { - LOG.info("Crawl delay for queue: " + fit.queueID + " is set to " + fiq.crawlDelay + " as per robots.txt. url: " + fit.url); + LOG.info("Crawl delay for queue: " + fit.queueID + + " is set to " + fiq.crawlDelay + + " as per robots.txt. url: " + fit.url); } } } - ProtocolOutput output = protocol.getProtocolOutput(fit.url, fit.datum); + ProtocolOutput output = protocol.getProtocolOutput(fit.url, + fit.datum); ProtocolStatus status = output.getStatus(); Content content = output.getContent(); ParseStatus pstatus = null; @@ -723,32 +767,31 @@ public class Fetcher extends Configured reporter.incrCounter("FetcherStatus", status.getName(), 1); - switch(status.getCode()) { + switch (status.getCode()) { case ProtocolStatus.WOULDBLOCK: // retry ? fetchQueues.addFetchItem(fit); break; - case ProtocolStatus.SUCCESS: // got a page - pstatus = output(fit.url, fit.datum, content, status, CrawlDatum.STATUS_FETCH_SUCCESS, fit.outlinkDepth); + case ProtocolStatus.SUCCESS: // got a page + pstatus = output(fit.url, fit.datum, content, status, + CrawlDatum.STATUS_FETCH_SUCCESS, fit.outlinkDepth); updateStatus(content.getContent().length); - if (pstatus != null && pstatus.isSuccess() && - pstatus.getMinorCode() == ParseStatus.SUCCESS_REDIRECT) { + if (pstatus != null && pstatus.isSuccess() + && pstatus.getMinorCode() == ParseStatus.SUCCESS_REDIRECT) { String newUrl = pstatus.getMessage(); int refreshTime = Integer.valueOf(pstatus.getArgs()[1]); - Text redirUrl = - handleRedirect(fit.url, fit.datum, - urlString, newUrl, - refreshTime < Fetcher.PERM_REFRESH_TIME, - Fetcher.CONTENT_REDIR); + Text redirUrl = handleRedirect(fit.url, fit.datum, urlString, + newUrl, refreshTime < Fetcher.PERM_REFRESH_TIME, + Fetcher.CONTENT_REDIR); if (redirUrl != null) { queueRedirect(redirUrl, fit); } } break; - case ProtocolStatus.MOVED: // redirect + case ProtocolStatus.MOVED: // redirect case ProtocolStatus.TEMP_MOVED: int code; boolean temp; @@ -761,10 +804,8 @@ public class Fetcher extends Configured } output(fit.url, fit.datum, content, status, code); String newUrl = status.getMessage(); - Text redirUrl = - handleRedirect(fit.url, fit.datum, - urlString, newUrl, temp, - Fetcher.PROTOCOL_REDIR); + Text redirUrl = handleRedirect(fit.url, fit.datum, urlString, + newUrl, temp, Fetcher.PROTOCOL_REDIR); if (redirUrl != null) { queueRedirect(redirUrl, fit); } else { @@ -775,31 +816,37 @@ public class Fetcher extends Configured case ProtocolStatus.EXCEPTION: logError(fit.url, status.getMessage()); - int killedURLs = fetchQueues.checkExceptionThreshold(fit.getQueueID()); - if (killedURLs!=0) - reporter.incrCounter("FetcherStatus", "AboveExceptionThresholdInQueue", killedURLs); + int killedURLs = fetchQueues.checkExceptionThreshold(fit + .getQueueID()); + if (killedURLs != 0) + reporter.incrCounter("FetcherStatus", + "AboveExceptionThresholdInQueue", killedURLs); /* FALLTHROUGH */ - case ProtocolStatus.RETRY: // retry + case ProtocolStatus.RETRY: // retry case ProtocolStatus.BLOCKED: - output(fit.url, fit.datum, null, status, CrawlDatum.STATUS_FETCH_RETRY); + output(fit.url, fit.datum, null, status, + CrawlDatum.STATUS_FETCH_RETRY); break; - case ProtocolStatus.GONE: // gone + case ProtocolStatus.GONE: // gone case ProtocolStatus.NOTFOUND: case ProtocolStatus.ACCESS_DENIED: case ProtocolStatus.ROBOTS_DENIED: - output(fit.url, fit.datum, null, status, CrawlDatum.STATUS_FETCH_GONE); + output(fit.url, fit.datum, null, status, + CrawlDatum.STATUS_FETCH_GONE); break; case ProtocolStatus.NOTMODIFIED: - output(fit.url, fit.datum, null, status, CrawlDatum.STATUS_FETCH_NOTMODIFIED); + output(fit.url, fit.datum, null, status, + CrawlDatum.STATUS_FETCH_NOTMODIFIED); break; default: if (LOG.isWarnEnabled()) { LOG.warn("Unknown ProtocolStatus: " + status.getCode()); } - output(fit.url, fit.datum, null, status, CrawlDatum.STATUS_FETCH_RETRY); + output(fit.url, fit.datum, null, status, + CrawlDatum.STATUS_FETCH_RETRY); } if (redirecting && redirectCount > maxRedirect) { @@ -807,34 +854,38 @@ public class Fetcher extends Configured if (LOG.isInfoEnabled()) { LOG.info(" - redirect count exceeded " + fit.url); } - output(fit.url, fit.datum, null, ProtocolStatus.STATUS_REDIR_EXCEEDED, CrawlDatum.STATUS_FETCH_GONE); + output(fit.url, fit.datum, null, + ProtocolStatus.STATUS_REDIR_EXCEEDED, + CrawlDatum.STATUS_FETCH_GONE); } } while (redirecting && (redirectCount <= maxRedirect)); - } catch (Throwable t) { // unexpected exception + } catch (Throwable t) { // unexpected exception // unblock fetchQueues.finishFetchItem(fit); logError(fit.url, StringUtils.stringifyException(t)); - output(fit.url, fit.datum, null, ProtocolStatus.STATUS_FAILED, CrawlDatum.STATUS_FETCH_RETRY); + output(fit.url, fit.datum, null, ProtocolStatus.STATUS_FAILED, + CrawlDatum.STATUS_FETCH_RETRY); } } } catch (Throwable e) { if (LOG.isErrorEnabled()) { - LOG.error("fetcher caught:"+e.toString()); + LOG.error("fetcher caught:" + e.toString()); } } finally { - if (fit != null) fetchQueues.finishFetchItem(fit); + if (fit != null) + fetchQueues.finishFetchItem(fit); activeThreads.decrementAndGet(); // count threads - LOG.info("-finishing thread " + getName() + ", activeThreads=" + activeThreads); + LOG.info("-finishing thread " + getName() + ", activeThreads=" + + activeThreads); } } - private Text handleRedirect(Text url, CrawlDatum datum, - String urlString, String newUrl, - boolean temp, String redirType) - throws MalformedURLException, URLFilterException { + private Text handleRedirect(Text url, CrawlDatum datum, String urlString, + String newUrl, boolean temp, String redirType) + throws MalformedURLException, URLFilterException { newUrl = normalizers.normalize(newUrl, URLNormalizers.SCOPE_FETCHER); newUrl = urlFilters.filter(newUrl); @@ -844,13 +895,14 @@ public class Fetcher extends Configured String newHost = new URL(newUrl).getHost().toLowerCase(); if (!origHost.equals(newHost)) { if (LOG.isDebugEnabled()) { - LOG.debug(" - ignoring redirect " + redirType + " from " + - urlString + " to " + newUrl + - " because external links are ignored"); + LOG.debug(" - ignoring redirect " + redirType + " from " + + urlString + " to " + newUrl + + " because external links are ignored"); } return null; } - } catch (MalformedURLException e) { } + } catch (MalformedURLException e) { + } } if (newUrl != null && !newUrl.equals(urlString)) { @@ -860,13 +912,13 @@ public class Fetcher extends Configured redirecting = true; redirectCount++; if (LOG.isDebugEnabled()) { - LOG.debug(" - " + redirType + " redirect to " + - url + " (fetching now)"); + LOG.debug(" - " + redirType + " redirect to " + url + + " (fetching now)"); } return url; } else { CrawlDatum newDatum = new CrawlDatum(CrawlDatum.STATUS_LINKED, - datum.getFetchInterval(),datum.getScore()); + datum.getFetchInterval(), datum.getScore()); // transfer existing metadata newDatum.getMetaData().putAll(datum.getMetaData()); try { @@ -880,21 +932,22 @@ public class Fetcher extends Configured } output(url, newDatum, null, null, CrawlDatum.STATUS_LINKED); if (LOG.isDebugEnabled()) { - LOG.debug(" - " + redirType + " redirect to " + - url + " (fetching later)"); + LOG.debug(" - " + redirType + " redirect to " + url + + " (fetching later)"); } return null; } } else { if (LOG.isDebugEnabled()) { - LOG.debug(" - " + redirType + " redirect skipped: " + - (newUrl != null ? "to same url" : "filtered")); + LOG.debug(" - " + redirType + " redirect skipped: " + + (newUrl != null ? "to same url" : "filtered")); } return null; } } - private void queueRedirect(Text redirUrl, FetchItem fit) throws ScoringFilterException { + private void queueRedirect(Text redirUrl, FetchItem fit) + throws ScoringFilterException { CrawlDatum newDatum = new CrawlDatum(CrawlDatum.STATUS_DB_UNFETCHED, fit.datum.getFetchInterval(), fit.datum.getScore()); // transfer all existing metadata to the redirect @@ -906,13 +959,13 @@ public class Fetcher extends Configured } fit = FetchItem.create(redirUrl, newDatum, queueMode); if (fit != null) { - FetchItemQueue fiq = - fetchQueues.getFetchItemQueue(fit.queueID); + FetchItemQueue fiq = fetchQueues.getFetchItemQueue(fit.queueID); fiq.addInProgressFetchItem(fit); } else { // stop redirecting redirecting = false; - reporter.incrCounter("FetcherStatus", "FetchItem.notCreated.redirect", 1); + reporter.incrCounter("FetcherStatus", "FetchItem.notCreated.redirect", + 1); } } @@ -923,26 +976,29 @@ public class Fetcher extends Configured errors.incrementAndGet(); } - private ParseStatus output(Text key, CrawlDatum datum, - Content content, ProtocolStatus pstatus, int status) { + private ParseStatus output(Text key, CrawlDatum datum, Content content, + ProtocolStatus pstatus, int status) { return output(key, datum, content, pstatus, status, 0); } - private ParseStatus output(Text key, CrawlDatum datum, - Content content, ProtocolStatus pstatus, int status, int outlinkDepth) { + private ParseStatus output(Text key, CrawlDatum datum, Content content, + ProtocolStatus pstatus, int status, int outlinkDepth) { datum.setStatus(status); datum.setFetchTime(System.currentTimeMillis()); - if (pstatus != null) datum.getMetaData().put(Nutch.WRITABLE_PROTO_STATUS_KEY, pstatus); - + if (pstatus != null) + datum.getMetaData().put(Nutch.WRITABLE_PROTO_STATUS_KEY, pstatus); + ParseResult parseResult = null; if (content != null) { Metadata metadata = content.getMetadata(); - + // store the guessed content type in the crawldatum - if (content.getContentType() != null) datum.getMetaData().put(new Text(Metadata.CONTENT_TYPE), new Text(content.getContentType())); - + if (content.getContentType() != null) + datum.getMetaData().put(new Text(Metadata.CONTENT_TYPE), + new Text(content.getContentType())); + // add segment to metadata metadata.set(Nutch.SEGMENT_NAME_KEY, segmentName); // add score to content metadata so that ParseSegment can pick it up. @@ -953,29 +1009,34 @@ public class Fetcher extends Configured LOG.warn("Couldn't pass score, url " + key + " (" + e + ")"); } } - /* Note: Fetcher will only follow meta-redirects coming from the - * original URL. */ + /* + * Note: Fetcher will only follow meta-redirects coming from the + * original URL. + */ if (parsing && status == CrawlDatum.STATUS_FETCH_SUCCESS) { - if (!skipTruncated || (skipTruncated && !ParseSegment.isTruncated(content))) { + if (!skipTruncated + || (skipTruncated && !ParseSegment.isTruncated(content))) { try { parseResult = this.parseUtil.parse(content); } catch (Exception e) { - LOG.warn("Error parsing: " + key + ": " + StringUtils.stringifyException(e)); + LOG.warn("Error parsing: " + key + ": " + + StringUtils.stringifyException(e)); } } - + if (parseResult == null) { - byte[] signature = - SignatureFactory.getSignature(getConf()).calculate(content, - new ParseStatus().getEmptyParse(conf)); + byte[] signature = SignatureFactory.getSignature(getConf()) + .calculate(content, new ParseStatus().getEmptyParse(conf)); datum.setSignature(signature); } } - /* Store status code in content So we can read this value during - * parsing (as a separate job) and decide to parse or not. + /* + * Store status code in content So we can read this value during parsing + * (as a separate job) and decide to parse or not. */ - content.getMetadata().add(Nutch.FETCH_STATUS_KEY, Integer.toString(status)); + content.getMetadata().add(Nutch.FETCH_STATUS_KEY, + Integer.toString(status)); } try { @@ -996,11 +1057,10 @@ public class Fetcher extends Configured // Calculate page signature. For non-parsing fetchers this will // be done in ParseSegment - byte[] signature = - SignatureFactory.getSignature(getConf()).calculate(content, parse); + byte[] signature = SignatureFactory.getSignature(getConf()) + .calculate(content, parse); // Ensure segment name and score are in parseData metadata - parseData.getContentMeta().set(Nutch.SEGMENT_NAME_KEY, - segmentName); + parseData.getContentMeta().set(Nutch.SEGMENT_NAME_KEY, segmentName); parseData.getContentMeta().set(Nutch.SIGNATURE_KEY, StringUtil.toHexString(signature)); // Pass fetch time to content meta @@ -1039,7 +1099,8 @@ public class Fetcher extends Configured for (int i = 0; i < links.length && validCount < outlinksToStore; i++) { String toUrl = links[i].getToUrl(); - toUrl = ParseOutputFormat.filterNormalize(url.toString(), toUrl, fromHost, ignoreExternalLinks, urlFilters, normalizers); + toUrl = ParseOutputFormat.filterNormalize(url.toString(), toUrl, + fromHost, ignoreExternalLinks, urlFilters, normalizers); if (toUrl == null) { continue; } @@ -1052,49 +1113,57 @@ public class Fetcher extends Configured // Only process depth N outlinks if (maxOutlinkDepth > 0 && outlinkDepth < maxOutlinkDepth) { - reporter.incrCounter("FetcherOutlinks", "outlinks_detected", outlinks.size()); + reporter.incrCounter("FetcherOutlinks", "outlinks_detected", + outlinks.size()); // Counter to limit num outlinks to follow per page int outlinkCounter = 0; - // Calculate variable number of outlinks by depth using the divisor (outlinks = Math.floor(divisor / depth * num.links)) - int maxOutlinksByDepth = (int)Math.floor(outlinksDepthDivisor / (outlinkDepth + 1) * maxOutlinkDepthNumLinks); + // Calculate variable number of outlinks by depth using the + // divisor (outlinks = Math.floor(divisor / depth * num.links)) + int maxOutlinksByDepth = (int) Math.floor(outlinksDepthDivisor + / (outlinkDepth + 1) * maxOutlinkDepthNumLinks); String followUrl; // Walk over the outlinks and add as new FetchItem to the queues Iterator<String> iter = outlinks.iterator(); - while(iter.hasNext() && outlinkCounter < maxOutlinkDepthNumLinks) { + while (iter.hasNext() && outlinkCounter < maxOutlinkDepthNumLinks) { followUrl = iter.next(); // Check whether we'll follow external outlinks if (outlinksIgnoreExternal) { - if (!URLUtil.getHost(url.toString()).equals(URLUtil.getHost(followUrl))) { + if (!URLUtil.getHost(url.toString()).equals( + URLUtil.getHost(followUrl))) { continue; } } - reporter.incrCounter("FetcherOutlinks", "outlinks_following", 1); + reporter + .incrCounter("FetcherOutlinks", "outlinks_following", 1); // Create new FetchItem with depth incremented - FetchItem fit = FetchItem.create(new Text(followUrl), new CrawlDatum(CrawlDatum.STATUS_LINKED, interval), queueMode, outlinkDepth + 1); + FetchItem fit = FetchItem.create(new Text(followUrl), + new CrawlDatum(CrawlDatum.STATUS_LINKED, interval), + queueMode, outlinkDepth + 1); fetchQueues.addFetchItem(fit); outlinkCounter++; } } - // Overwrite the outlinks in ParseData with the normalized and filtered set - parseData.setOutlinks(outlinkList.toArray(new Outlink[outlinkList.size()])); + // Overwrite the outlinks in ParseData with the normalized and + // filtered set + parseData.setOutlinks(outlinkList.toArray(new Outlink[outlinkList + .size()])); - output.collect(url, new NutchWritable( - new ParseImpl(new ParseText(parse.getText()), - parseData, parse.isCanonical()))); + output.collect(url, new NutchWritable(new ParseImpl(new ParseText( + parse.getText()), parseData, parse.isCanonical()))); } } } catch (IOException e) { if (LOG.isErrorEnabled()) { - LOG.error("fetcher caught:"+e.toString()); + LOG.error("fetcher caught:" + e.toString()); } } @@ -1102,7 +1171,8 @@ public class Fetcher extends Configured if (parseResult != null && !parseResult.isEmpty()) { Parse p = parseResult.get(content.getUrl()); if (p != null) { - reporter.incrCounter("ParserStatus", ParseStatus.majorCodes[p.getData().getStatus().getMajorCode()], 1); + reporter.incrCounter("ParserStatus", ParseStatus.majorCodes[p + .getData().getStatus().getMajorCode()], 1); return p.getData().getStatus(); } } @@ -1116,33 +1186,39 @@ public class Fetcher extends Configured public synchronized boolean isHalted() { return halted; } - + } - public Fetcher() { super(null); } + public Fetcher() { + super(null); + } - public Fetcher(Configuration conf) { super(conf); } + public Fetcher(Configuration conf) { + super(conf); + } private void updateStatus(int bytesInPage) throws IOException { pages.incrementAndGet(); bytes.addAndGet(bytesInPage); } - - private void reportStatus(int pagesLastSec, int bytesLastSec) throws IOException { + private void reportStatus(int pagesLastSec, int bytesLastSec) + throws IOException { StringBuilder status = new StringBuilder(); - Long elapsed = new Long((System.currentTimeMillis() - start)/1000); + Long elapsed = new Long((System.currentTimeMillis() - start) / 1000); - float avgPagesSec = (float) pages.get() / elapsed.floatValue(); - long avgBytesSec = (bytes.get() /125l) / elapsed.longValue(); + float avgPagesSec = (float) pages.get() / elapsed.floatValue(); + long avgBytesSec = (bytes.get() / 125l) / elapsed.longValue(); - status.append(activeThreads).append(" threads (").append(spinWaiting.get()).append(" waiting), "); + status.append(activeThreads).append(" threads (").append(spinWaiting.get()) + .append(" waiting), "); status.append(fetchQueues.getQueueCount()).append(" queues, "); status.append(fetchQueues.getTotalSize()).append(" URLs queued, "); status.append(pages).append(" pages, ").append(errors).append(" errors, "); status.append(String.format("%.2f", avgPagesSec)).append(" pages/s ("); status.append(pagesLastSec).append(" last sec), "); - status.append(avgBytesSec).append(" kbits/s (").append((bytesLastSec / 125)).append(" last sec)"); + status.append(avgBytesSec).append(" kbits/s (") + .append((bytesLastSec / 125)).append(" last sec)"); reporter.setStatus(status.toString()); } @@ -1154,12 +1230,13 @@ public class Fetcher extends Configured this.storingContent = isStoringContent(job); this.parsing = isParsing(job); -// if (job.getBoolean("fetcher.verbose", false)) { -// LOG.setLevel(Level.FINE); -// } + // if (job.getBoolean("fetcher.verbose", false)) { + // LOG.setLevel(Level.FINE); + // } } - public void close() {} + public void close() { + } public static boolean isParsing(Configuration conf) { return conf.getBoolean("fetcher.parse", true); @@ -1170,43 +1247,53 @@ public class Fetcher extends Configured } public void run(RecordReader<Text, CrawlDatum> input, - OutputCollector<Text, NutchWritable> output, - Reporter reporter) throws IOException { + OutputCollector<Text, NutchWritable> output, Reporter reporter) + throws IOException { this.output = output; this.reporter = reporter; this.fetchQueues = new FetchItemQueues(getConf()); int threadCount = getConf().getInt("fetcher.threads.fetch", 10); - if (LOG.isInfoEnabled()) { LOG.info("Fetcher: threads: " + threadCount); } + if (LOG.isInfoEnabled()) { + LOG.info("Fetcher: threads: " + threadCount); + } int timeoutDivisor = getConf().getInt("fetcher.threads.timeout.divisor", 2); - if (LOG.isInfoEnabled()) { LOG.info("Fetcher: time-out divisor: " + timeoutDivisor); } + if (LOG.isInfoEnabled()) { + LOG.info("Fetcher: time-out divisor: " + timeoutDivisor); + } - int queueDepthMuliplier = getConf().getInt("fetcher.queue.depth.multiplier", 50); + int queueDepthMuliplier = getConf().getInt( + "fetcher.queue.depth.multiplier", 50); - feeder = new QueueFeeder(input, fetchQueues, threadCount * queueDepthMuliplier); - //feeder.setPriority((Thread.MAX_PRIORITY + Thread.NORM_PRIORITY) / 2); + feeder = new QueueFeeder(input, fetchQueues, threadCount + * queueDepthMuliplier); + // feeder.setPriority((Thread.MAX_PRIORITY + Thread.NORM_PRIORITY) / 2); - // the value of the time limit is either -1 or the time where it should finish + // the value of the time limit is either -1 or the time where it should + // finish long timelimit = getConf().getLong("fetcher.timelimit", -1); - if (timelimit != -1) feeder.setTimeLimit(timelimit); + if (timelimit != -1) + feeder.setTimeLimit(timelimit); feeder.start(); // set non-blocking & no-robots mode for HTTP protocol plugins. getConf().setBoolean(Protocol.CHECK_BLOCKING, false); getConf().setBoolean(Protocol.CHECK_ROBOTS, false); - for (int i = 0; i < threadCount; i++) { // spawn threads + for (int i = 0; i < threadCount; i++) { // spawn threads FetcherThread t = new FetcherThread(getConf()); fetcherThreads.add(t); t.start(); } // select a timeout that avoids a task timeout - long timeout = getConf().getInt("mapred.task.timeout", 10*60*1000)/timeoutDivisor; + long timeout = getConf().getInt("mapred.task.timeout", 10 * 60 * 1000) + / timeoutDivisor; - // Used for threshold check, holds pages and bytes processed in the last second + // Used for threshold check, holds pages and bytes processed in the last + // second int pagesLastSec; int bytesLastSec; @@ -1214,57 +1301,74 @@ public class Fetcher extends Configured boolean throughputThresholdExceeded = false; int throughputThresholdNumRetries = 0; - int throughputThresholdPages = getConf().getInt("fetcher.throughput.threshold.pages", -1); - if (LOG.isInfoEnabled()) { LOG.info("Fetcher: throughput threshold: " + throughputThresholdPages); } - int throughputThresholdMaxRetries = getConf().getInt("fetcher.throughput.threshold.retries", 5); - if (LOG.isInfoEnabled()) { LOG.info("Fetcher: throughput threshold retries: " + throughputThresholdMaxRetries); } - long throughputThresholdTimeLimit = getConf().getLong("fetcher.throughput.threshold.check.after", -1); - + int throughputThresholdPages = getConf().getInt( + "fetcher.throughput.threshold.pages", -1); + if (LOG.isInfoEnabled()) { + LOG.info("Fetcher: throughput threshold: " + throughputThresholdPages); + } + int throughputThresholdMaxRetries = getConf().getInt( + "fetcher.throughput.threshold.retries", 5); + if (LOG.isInfoEnabled()) { + LOG.info("Fetcher: throughput threshold retries: " + + throughputThresholdMaxRetries); + } + long throughputThresholdTimeLimit = getConf().getLong( + "fetcher.throughput.threshold.check.after", -1); + int targetBandwidth = getConf().getInt("fetcher.bandwidth.target", -1) * 1000; int maxNumThreads = getConf().getInt("fetcher.maxNum.threads", threadCount); - if (maxNumThreads < threadCount){ - LOG.info("fetcher.maxNum.threads can't be < than "+ threadCount + " : using "+threadCount+" instead"); + if (maxNumThreads < threadCount) { + LOG.info("fetcher.maxNum.threads can't be < than " + threadCount + + " : using " + threadCount + " instead"); maxNumThreads = threadCount; } - int bandwidthTargetCheckEveryNSecs = getConf().getInt("fetcher.bandwidth.target.check.everyNSecs", 30); - if (bandwidthTargetCheckEveryNSecs < 1){ + int bandwidthTargetCheckEveryNSecs = getConf().getInt( + "fetcher.bandwidth.target.check.everyNSecs", 30); + if (bandwidthTargetCheckEveryNSecs < 1) { LOG.info("fetcher.bandwidth.target.check.everyNSecs can't be < to 1 : using 1 instead"); bandwidthTargetCheckEveryNSecs = 1; } - + int maxThreadsPerQueue = getConf().getInt("fetcher.threads.per.queue", 1); - + int bandwidthTargetCheckCounter = 0; long bytesAtLastBWTCheck = 0l; - - do { // wait for threads to exit + + do { // wait for threads to exit pagesLastSec = pages.get(); - bytesLastSec = (int)bytes.get(); + bytesLastSec = (int) bytes.get(); try { Thread.sleep(1000); - } catch (InterruptedException e) {} + } catch (InterruptedException e) { + } pagesLastSec = pages.get() - pagesLastSec; - bytesLastSec = (int)bytes.get() - bytesLastSec; + bytesLastSec = (int) bytes.get() - bytesLastSec; reporter.incrCounter("FetcherStatus", "bytes_downloaded", bytesLastSec); reportStatus(pagesLastSec, bytesLastSec); - LOG.info("-activeThreads=" + activeThreads + ", spinWaiting=" + spinWaiting.get() - + ", fetchQueues.totalSize=" + fetchQueues.getTotalSize()+ ", fetchQueues.getQueueCount="+fetchQueues.getQueueCount()); + LOG.info("-activeThreads=" + activeThreads + ", spinWaiting=" + + spinWaiting.get() + ", fetchQueues.totalSize=" + + fetchQueues.getTotalSize() + ", fetchQueues.getQueueCount=" + + fetchQueues.getQueueCount()); if (!feeder.isAlive() && fetchQueues.getTotalSize() < 5) { fetchQueues.dump(); } // if throughput threshold is enabled - if (throughputThresholdTimeLimit < System.currentTimeMillis() && throughputThresholdPages != -1) { + if (throughputThresholdTimeLimit < System.currentTimeMillis() + && throughputThresholdPages != -1) { // Check if we're dropping below the threshold if (pagesLastSec < throughputThresholdPages) { throughputThresholdNumRetries++; - LOG.warn(Integer.toString(throughputThresholdNumRetries) + ": dropping below configured threshold of " + Integer.toString(throughputThresholdPages) + " pages per second"); + LOG.warn(Integer.toString(throughputThresholdNumRetries) + + ": dropping below configured threshold of " + + Integer.toString(throughputThresholdPages) + + " pages per second"); // Quit if we dropped below threshold too many times if (throughputThresholdNumRetries == throughputThresholdMaxRetries) { @@ -1273,42 +1377,55 @@ public class Fetcher extends Configured // Disable the threshold checker throughputThresholdPages = -1; - // Empty the queues cleanly and get number of items that were dropped + // Empty the queues cleanly and get number of items that were + // dropped int hitByThrougputThreshold = fetchQueues.emptyQueues(); - if (hitByThrougputThreshold != 0) reporter.incrCounter("FetcherStatus", - "hitByThrougputThreshold", hitByThrougputThreshold); + if (hitByThrougputThreshold != 0) + reporter.incrCounter("FetcherStatus", "hitByThrougputThreshold", + hitByThrougputThreshold); } } } - + // adjust the number of threads if a target bandwidth has been set - if (targetBandwidth>0) { - if (bandwidthTargetCheckCounter < bandwidthTargetCheckEveryNSecs) bandwidthTargetCheckCounter++; - else if (bandwidthTargetCheckCounter == bandwidthTargetCheckEveryNSecs){ - long bpsSinceLastCheck = ((bytes.get() - bytesAtLastBWTCheck) * 8)/bandwidthTargetCheckEveryNSecs; + if (targetBandwidth > 0) { + if (bandwidthTargetCheckCounter < bandwidthTargetCheckEveryNSecs) + bandwidthTargetCheckCounter++; + else if (bandwidthTargetCheckCounter == bandwidthTargetCheckEveryNSecs) { + long bpsSinceLastCheck = ((bytes.get() - bytesAtLastBWTCheck) * 8) + / bandwidthTargetCheckEveryNSecs; bytesAtLastBWTCheck = bytes.get(); bandwidthTargetCheckCounter = 0; int averageBdwPerThread = 0; - if (activeThreads.get()>0) - averageBdwPerThread = Math.round(bpsSinceLastCheck/activeThreads.get()); + if (activeThreads.get() > 0) + averageBdwPerThread = Math.round(bpsSinceLastCheck + / activeThreads.get()); - LOG.info("averageBdwPerThread : "+(averageBdwPerThread/1000) + " kbps"); + LOG.info("averageBdwPerThread : " + (averageBdwPerThread / 1000) + + " kbps"); - if (bpsSinceLastCheck < targetBandwidth && averageBdwPerThread > 0){ + if (bpsSinceLastCheck < targetBandwidth && averageBdwPerThread > 0) { // check whether it is worth doing e.g. more queues than threads - if ((fetchQueues.getQueueCount() * maxThreadsPerQueue) > activeThreads.get()){ - + if ((fetchQueues.getQueueCount() * maxThreadsPerQueue) > activeThreads + .get()) { + long remainingBdw = targetBandwidth - bpsSinceLastCheck; - int additionalThreads = Math.round(remainingBdw/averageBdwPerThread); + int additionalThreads = Math.round(remainingBdw + / averageBdwPerThread); int availableThreads = maxNumThreads - activeThreads.get(); - // determine the number of available threads (min between availableThreads and additionalThreads) - additionalThreads = (availableThreads < additionalThreads ? availableThreads:additionalThreads); - LOG.info("Has space for more threads ("+(bpsSinceLastCheck/1000) +" vs "+(targetBandwidth/1000)+" kbps) \t=> adding "+additionalThreads+" new threads"); + // determine the number of available threads (min between + // availableThreads and additionalThreads) + additionalThreads = (availableThreads < additionalThreads ? availableThreads + : additionalThreads); + LOG.info("Has space for more threads (" + + (bpsSinceLastCheck / 1000) + " vs " + + (targetBandwidth / 1000) + " kbps) \t=> adding " + + additionalThreads + " new threads"); // activate new threads for (int i = 0; i < additionalThreads; i++) { FetcherThread thread = new FetcherThread(getConf()); @@ -1316,14 +1433,18 @@ public class Fetcher extends Configured thread.start(); } } - } - else if (bpsSinceLastCheck > targetBandwidth && averageBdwPerThread > 0){ - // if the bandwidth we're using is greater then the expected bandwidth, we have to stop some threads + } else if (bpsSinceLastCheck > targetBandwidth + && averageBdwPerThread > 0) { + // if the bandwidth we're using is greater then the expected + // bandwidth, we have to stop some threads long excessBdw = bpsSinceLastCheck - targetBandwidth; - int excessThreads = Math.round(excessBdw/averageBdwPerThread); - LOG.info("Exceeding target bandwidth ("+bpsSinceLastCheck/1000 +" vs "+(targetBandwidth/1000)+" kbps). \t=> excessThreads = "+excessThreads); + int excessThreads = Math.round(excessBdw / averageBdwPerThread); + LOG.info("Exceeding target bandwidth (" + bpsSinceLastCheck / 1000 + + " vs " + (targetBandwidth / 1000) + + " kbps). \t=> excessThreads = " + excessThreads); // keep at least one - if (excessThreads >= fetcherThreads.size()) excessThreads = 0; + if (excessThreads >= fetcherThreads.size()) + excessThreads = 0; // de-activates threads for (int i = 0; i < excessThreads; i++) { FetcherThread thread = fetcherThreads.removeLast(); @@ -1336,18 +1457,20 @@ public class Fetcher extends Configured // check timelimit if (!feeder.isAlive()) { int hitByTimeLimit = fetchQueues.checkTimelimit(); - if (hitByTimeLimit != 0) reporter.incrCounter("FetcherStatus", - "hitByTimeLimit", hitByTimeLimit); + if (hitByTimeLimit != 0) + reporter.incrCounter("FetcherStatus", "hitByTimeLimit", + hitByTimeLimit); } // some requests seem to hang, despite all intentions if ((System.currentTimeMillis() - lastRequestStart.get()) > timeout) { if (LOG.isWarnEnabled()) { - LOG.warn("Aborting with "+activeThreads+" hung threads."); + LOG.warn("Aborting with " + activeThreads + " hung threads."); for (int i = 0; i < fetcherThreads.size(); i++) { FetcherThread thread = fetcherThreads.get(i); if (thread.isAlive()) { - LOG.warn("Thread #" + i + " hung while processing " + thread.reprUrl); + LOG.warn("Thread #" + i + " hung while processing " + + thread.reprUrl); if (LOG.isDebugEnabled()) { StackTraceElement[] stack = thread.getStackTrace(); StringBuilder sb = new StringBuilder(); @@ -1368,8 +1491,7 @@ public class Fetcher extends Configured } - public void fetch(Path segment, int threads) - throws IOException { + public void fetch(Path segment, int threads) throws IOException { checkConfiguration(); @@ -1390,24 +1512,31 @@ public class Fetcher extends Configured getConf().setLong("fetcher.timelimit", timelimit); } - // Set the time limit after which the throughput threshold feature is enabled - timelimit = getConf().getLong("fetcher.throughput.threshold.check.after", 10); + // Set the time limit after which the throughput threshold feature is + // enabled + timelimit = getConf().getLong("fetcher.throughput.threshold.check.after", + 10); timelimit = System.currentTimeMillis() + (timelimit * 60 * 1000); getConf().setLong("fetcher.throughput.threshold.check.after", timelimit); int maxOutlinkDepth = getConf().getInt("fetcher.follow.outlinks.depth", -1); if (maxOutlinkDepth > 0) { - LOG.info("Fetcher: following outlinks up to depth: " + Integer.toString(maxOutlinkDepth)); + LOG.info("Fetcher: following outlinks up to depth: " + + Integer.toString(maxOutlinkDepth)); - int maxOutlinkDepthNumLinks = getConf().getInt("fetcher.follow.outlinks.num.links", 4); - int outlinksDepthDivisor = getConf().getInt("fetcher.follow.outlinks.depth.divisor", 2); + int maxOutlinkDepthNumLinks = getConf().getInt( + "fetcher.follow.outlinks.num.links", 4); + int outlinksDepthDivisor = getConf().getInt( + "fetcher.follow.outlinks.depth.divisor", 2); int totalOutlinksToFollow = 0; for (int i = 0; i < maxOutlinkDepth; i++) { - totalOutlinksToFollow += (int)Math.floor(outlinksDepthDivisor / (i + 1) * maxOutlinkDepthNumLinks); + totalOutlinksToFollow += (int) Math.floor(outlinksDepthDivisor + / (i + 1) * maxOutlinkDepthNumLinks); } - LOG.info("Fetcher: maximum outlinks to follow: " + Integer.toString(totalOutlinksToFollow)); + LOG.info("Fetcher: maximum outlinks to follow: " + + Integer.toString(totalOutlinksToFollow)); } JobConf job = new NutchJob(getConf()); @@ -1419,7 +1548,8 @@ public class Fetcher extends Configured // for politeness, don't permit parallel execution of a single task job.setSpeculativeExecution(false); - FileInputFormat.addInputPath(job, new Path(segment, CrawlDatum.GENERATE_DIR_NAME)); + FileInputFormat.addInputPath(job, new Path(segment, + CrawlDatum.GENERATE_DIR_NAME)); job.setInputFormat(InputFormat.class); job.setMapRunnerClass(Fetcher.class); @@ -1432,10 +1562,10 @@ public class Fetcher extends Configured JobClient.runJob(job); long end = System.currentTimeMillis(); - LOG.info("Fetcher: finished at " + sdf.format(end) + ", elapsed: " + TimingUtil.elapsedTime(start, end)); + LOG.info("Fetcher: finished at " + sdf.format(end) + ", elapsed: " + + TimingUtil.elapsedTime(start, end)); } - /** Run the fetcher. */ public static void main(String[] args) throws Exception { int res = ToolRunner.run(NutchConfiguration.create(), new Fetcher(), args); @@ -1456,9 +1586,9 @@ public class Fetcher extends Configured int threads = getConf().getInt("fetcher.threads.fetch", 10); boolean parsing = false; - for (int i = 1; i < args.length; i++) { // parse command line - if (args[i].equals("-threads")) { // found -threads option - threads = Integer.parseInt(args[++i]); + for (int i = 1; i < args.length; i++) { // parse command line + if (args[i].equals("-threads")) { // found -threads option + threads = Integer.parseInt(args[++i]); } }
Modified: nutch/trunk/src/java/org/apache/nutch/fetcher/FetcherOutputFormat.java URL: http://svn.apache.org/viewvc/nutch/trunk/src/java/org/apache/nutch/fetcher/FetcherOutputFormat.java?rev=1655526&r1=1655525&r2=1655526&view=diff ============================================================================== --- nutch/trunk/src/java/org/apache/nutch/fetcher/FetcherOutputFormat.java (original) +++ nutch/trunk/src/java/org/apache/nutch/fetcher/FetcherOutputFormat.java Thu Jan 29 05:38:59 2015 @@ -48,74 +48,68 @@ public class FetcherOutputFormat impleme public void checkOutputSpecs(FileSystem fs, JobConf job) throws IOException { Path out = FileOutputFormat.getOutputPath(job); if ((out == null) && (job.getNumReduceTasks() != 0)) { - throw new InvalidJobConfException( - "Output directory not set in JobConf."); + throw new InvalidJobConfException("Output directory not set in JobConf."); } if (fs == null) { - fs = out.getFileSystem(job); + fs = out.getFileSystem(job); } if (fs.exists(new Path(out, CrawlDatum.FETCH_DIR_NAME))) - throw new IOException("Segment already fetched!"); + throw new IOException("Segment already fetched!"); } public RecordWriter<Text, NutchWritable> getRecordWriter(final FileSystem fs, - final JobConf job, - final String name, - final Progressable progress) throws IOException { + final JobConf job, final String name, final Progressable progress) + throws IOException { Path out = FileOutputFormat.getOutputPath(job); - final Path fetch = - new Path(new Path(out, CrawlDatum.FETCH_DIR_NAME), name); - final Path content = - new Path(new Path(out, Content.DIR_NAME), name); - - final CompressionType compType = SequenceFileOutputFormat.getOutputCompressionType(job); - - final MapFile.Writer fetchOut = - new MapFile.Writer(job, fs, fetch.toString(), Text.class, CrawlDatum.class, - compType, progress); - + final Path fetch = new Path(new Path(out, CrawlDatum.FETCH_DIR_NAME), name); + final Path content = new Path(new Path(out, Content.DIR_NAME), name); + + final CompressionType compType = SequenceFileOutputFormat + .getOutputCompressionType(job); + + final MapFile.Writer fetchOut = new MapFile.Writer(job, fs, + fetch.toString(), Text.class, CrawlDatum.class, compType, progress); + return new RecordWriter<Text, NutchWritable>() { - private MapFile.Writer contentOut; - private RecordWriter<Text, Parse> parseOut; + private MapFile.Writer contentOut; + private RecordWriter<Text, Parse> parseOut; - { - if (Fetcher.isStoringContent(job)) { - contentOut = new MapFile.Writer(job, fs, content.toString(), - Text.class, Content.class, - compType, progress); - } - - if (Fetcher.isParsing(job)) { - parseOut = new ParseOutputFormat().getRecordWriter(fs, job, name, progress); - } + { + if (Fetcher.isStoringContent(job)) { + contentOut = new MapFile.Writer(job, fs, content.toString(), + Text.class, Content.class, compType, progress); } - public void write(Text key, NutchWritable value) - throws IOException { - - Writable w = value.get(); - - if (w instanceof CrawlDatum) - fetchOut.append(key, w); - else if (w instanceof Content && contentOut != null) - contentOut.append(key, w); - else if (w instanceof Parse && parseOut != null) - parseOut.write(key, (Parse)w); + if (Fetcher.isParsing(job)) { + parseOut = new ParseOutputFormat().getRecordWriter(fs, job, name, + progress); } + } + + public void write(Text key, NutchWritable value) throws IOException { - public void close(Reporter reporter) throws IOException { - fetchOut.close(); - if (contentOut != null) { - contentOut.close(); - } - if (parseOut != null) { - parseOut.close(reporter); - } + Writable w = value.get(); + + if (w instanceof CrawlDatum) + fetchOut.append(key, w); + else if (w instanceof Content && contentOut != null) + contentOut.append(key, w); + else if (w instanceof Parse && parseOut != null) + parseOut.write(key, (Parse) w); + } + + public void close(Reporter reporter) throws IOException { + fetchOut.close(); + if (contentOut != null) { + contentOut.close(); + } + if (parseOut != null) { + parseOut.close(reporter); } + } - }; + }; - } + } } -