Author: jnioche Date: Sat May 10 12:39:11 2014 New Revision: 1593694 URL: http://svn.apache.org/r1593694 Log: NUTCH-207 Bandwidth target for fetcher rather than a thread count
Modified: nutch/trunk/CHANGES.txt nutch/trunk/conf/nutch-default.xml nutch/trunk/src/java/org/apache/nutch/fetcher/Fetcher.java Modified: nutch/trunk/CHANGES.txt URL: http://svn.apache.org/viewvc/nutch/trunk/CHANGES.txt?rev=1593694&r1=1593693&r2=1593694&view=diff ============================================================================== --- nutch/trunk/CHANGES.txt (original) +++ nutch/trunk/CHANGES.txt Sat May 10 12:39:11 2014 @@ -2,6 +2,8 @@ Nutch Change Log Nutch Current Development +* NUTCH-207 Bandwidth target for fetcher rather than a thread count (jnioche) + * NUTCH-1182 fetcher to log hung threads (snagel) * NUTCH-1759 Upgrade to Crawler Commons 0.4 (jnioche) Modified: nutch/trunk/conf/nutch-default.xml URL: http://svn.apache.org/viewvc/nutch/trunk/conf/nutch-default.xml?rev=1593694&r1=1593693&r2=1593694&view=diff ============================================================================== --- nutch/trunk/conf/nutch-default.xml (original) +++ nutch/trunk/conf/nutch-default.xml Sat May 10 12:39:11 2014 @@ -838,6 +838,28 @@ </description> </property> +<property> + <name>fetcher.bandwidth.target</name> + <value>-1</value> + <description>Target bandwidth in kilobits per sec for each mapper instance. This is used to adjust the number of + fetching threads automatically (up to fetcher.maxNum.threads). A value of -1 deactivates the functionality, in which case + the number of fetching threads is fixed (see fetcher.threads.fetch).</description> +</property> + +<property> + <name>fetcher.maxNum.threads</name> + <value>25</value> + <description>Max number of fetch threads allowed when using fetcher.bandwidth.target. Defaults to fetcher.threads.fetch if unspecified or + set to a value lower than it. </description> +</property> + +<property> + <name>fetcher.bandwidth.target.check.everyNSecs</name> + <value>30</value> + <description>(EXPERT) Value in seconds which determines how frequently we should reassess the optimal number of fetch threads when using + fetcher.bandwidth.target. Defaults to 30 and must be at least 1.</description> +</property> + <!-- moreindexingfilter plugin properties --> <property> Modified: nutch/trunk/src/java/org/apache/nutch/fetcher/Fetcher.java URL: http://svn.apache.org/viewvc/nutch/trunk/src/java/org/apache/nutch/fetcher/Fetcher.java?rev=1593694&r1=1593693&r2=1593694&view=diff ============================================================================== --- nutch/trunk/src/java/org/apache/nutch/fetcher/Fetcher.java (original) +++ nutch/trunk/src/java/org/apache/nutch/fetcher/Fetcher.java Sat May 10 12:39:11 2014 @@ -595,6 +595,8 @@ public class Fetcher extends Configured private int outlinksDepthDivisor; private boolean skipTruncated; + + private boolean halted = false; public FetcherThread(Configuration conf) { this.setDaemon(true); // don't hang JVM on exit @@ -637,6 +639,13 @@ public class Fetcher extends Configured try { while (true) { + // check whether must be stopped + if (isHalted()) { + LOG.debug(getName() + " set to halted"); + fit = null; + return; + } + fit = fetchQueues.getFetchItem(); if (fit == null) { if (feeder.isAlive() || fetchQueues.getTotalSize() > 0) { @@ -650,6 +659,7 @@ public class Fetcher extends Configured continue; } else { // all done, finish this thread + LOG.info("Thread " + getName() + " has no more work available"); return; } } @@ -666,8 +676,8 @@ public class Fetcher extends Configured redirecting = false; redirectCount = 0; do { - if (LOG.isInfoEnabled()) { - LOG.info("fetching " + fit.url + " (queue crawl delay=" + + if (LOG.isDebugEnabled()) { + LOG.debug("fetching " + fit.url + " (queue crawl delay=" + fetchQueues.getFetchItemQueue(fit.queueID).crawlDelay + "ms)"); } if (LOG.isDebugEnabled()) { @@ -1099,6 +1109,14 @@ public class Fetcher extends Configured return null; } + public synchronized void setHalted(boolean halted) { + this.halted = halted; + } + + public synchronized boolean isHalted() { + return halted; + } + } public Fetcher() { super(null); } @@ -1201,7 +1219,24 @@ public class Fetcher extends Configured int throughputThresholdMaxRetries = getConf().getInt("fetcher.throughput.threshold.retries", 5); if (LOG.isInfoEnabled()) { LOG.info("Fetcher: throughput threshold retries: " + throughputThresholdMaxRetries); } long throughputThresholdTimeLimit = getConf().getLong("fetcher.throughput.threshold.check.after", -1); - + + int targetBandwidth = getConf().getInt("fetcher.bandwidth.target", -1) * 1000; + int maxNumThreads = getConf().getInt("fetcher.maxNum.threads", threadCount); + if (maxNumThreads < threadCount){ + LOG.info("fetcher.maxNum.threads can't be < than "+ threadCount + " : using "+threadCount+" instead"); + maxNumThreads = threadCount; + } + int bandwidthTargetCheckEveryNSecs = getConf().getInt("fetcher.bandwidth.target.check.everyNSecs", 30); + if (bandwidthTargetCheckEveryNSecs < 1){ + LOG.info("fetcher.bandwidth.target.check.everyNSecs can't be < to 1 : using 1 instead"); + bandwidthTargetCheckEveryNSecs = 1; + } + + int maxThreadsPerQueue = getConf().getInt("fetcher.threads.per.queue", 1); + + int bandwidthTargetCheckCounter = 0; + long bytesAtLastBWTCheck = 0l; + do { // wait for threads to exit pagesLastSec = pages.get(); bytesLastSec = (int)bytes.get(); @@ -1218,7 +1253,7 @@ public class Fetcher extends Configured reportStatus(pagesLastSec, bytesLastSec); LOG.info("-activeThreads=" + activeThreads + ", spinWaiting=" + spinWaiting.get() - + ", fetchQueues.totalSize=" + fetchQueues.getTotalSize()); + + ", fetchQueues.totalSize=" + fetchQueues.getTotalSize()+ ", fetchQueues.getQueueCount="+fetchQueues.getQueueCount()); if (!feeder.isAlive() && fetchQueues.getTotalSize() < 5) { fetchQueues.dump(); @@ -1246,6 +1281,57 @@ public class Fetcher extends Configured } } } + + // adjust the number of threads if a target bandwidth has been set + if (targetBandwidth>0) { + if (bandwidthTargetCheckCounter < bandwidthTargetCheckEveryNSecs) bandwidthTargetCheckCounter++; + else if (bandwidthTargetCheckCounter == bandwidthTargetCheckEveryNSecs){ + long bpsSinceLastCheck = ((bytes.get() - bytesAtLastBWTCheck) * 8)/bandwidthTargetCheckEveryNSecs; + + bytesAtLastBWTCheck = bytes.get(); + bandwidthTargetCheckCounter = 0; + + int averageBdwPerThread = 0; + if (activeThreads.get()>0) + averageBdwPerThread = Math.round(bpsSinceLastCheck/activeThreads.get()); + + LOG.info("averageBdwPerThread : "+(averageBdwPerThread/1000) + " kbps"); + + if (bpsSinceLastCheck < targetBandwidth && averageBdwPerThread > 0){ + // check whether it is worth doing e.g. more queues than threads + + if ((fetchQueues.getQueueCount() * maxThreadsPerQueue) > activeThreads.get()){ + + long remainingBdw = targetBandwidth - bpsSinceLastCheck; + int additionalThreads = Math.round(remainingBdw/averageBdwPerThread); + int availableThreads = maxNumThreads - activeThreads.get(); + + // determine the number of available threads (min between availableThreads and additionalThreads) + additionalThreads = (availableThreads < additionalThreads ? availableThreads:additionalThreads); + LOG.info("Has space for more threads ("+(bpsSinceLastCheck/1000) +" vs "+(targetBandwidth/1000)+" kbps) \t=> adding "+additionalThreads+" new threads"); + // activate new threads + for (int i = 0; i < additionalThreads; i++) { + FetcherThread thread = new FetcherThread(getConf()); + fetcherThreads.add(thread); + thread.start(); + } + } + } + else if (bpsSinceLastCheck > targetBandwidth && averageBdwPerThread > 0){ + // if the bandwidth we're using is greater then the expected bandwidth, we have to stop some threads + long excessBdw = bpsSinceLastCheck - targetBandwidth; + int excessThreads = Math.round(excessBdw/averageBdwPerThread); + LOG.info("Exceeding target bandwidth ("+bpsSinceLastCheck/1000 +" vs "+(targetBandwidth/1000)+" kbps). \t=> excessThreads = "+excessThreads); + // keep at least one + if (excessThreads >= fetcherThreads.size()) excessThreads = 0; + // de-activates threads + for (int i = 0; i < excessThreads; i++) { + FetcherThread thread = fetcherThreads.removeLast(); + thread.setHalted(true); + } + } + } + } // check timelimit if (!feeder.isAlive()) {