Author: jnioche
Date: Sat May 10 12:39:11 2014
New Revision: 1593694

URL: http://svn.apache.org/r1593694
Log:
NUTCH-207 Bandwidth target for fetcher rather than a thread count

Modified:
    nutch/trunk/CHANGES.txt
    nutch/trunk/conf/nutch-default.xml
    nutch/trunk/src/java/org/apache/nutch/fetcher/Fetcher.java

Modified: nutch/trunk/CHANGES.txt
URL: 
http://svn.apache.org/viewvc/nutch/trunk/CHANGES.txt?rev=1593694&r1=1593693&r2=1593694&view=diff
==============================================================================
--- nutch/trunk/CHANGES.txt (original)
+++ nutch/trunk/CHANGES.txt Sat May 10 12:39:11 2014
@@ -2,6 +2,8 @@ Nutch Change Log
 
 Nutch Current Development
 
+* NUTCH-207 Bandwidth target for fetcher rather than a thread count (jnioche)
+
 * NUTCH-1182 fetcher to log hung threads (snagel)
 
 * NUTCH-1759 Upgrade to Crawler Commons 0.4 (jnioche)

Modified: nutch/trunk/conf/nutch-default.xml
URL: 
http://svn.apache.org/viewvc/nutch/trunk/conf/nutch-default.xml?rev=1593694&r1=1593693&r2=1593694&view=diff
==============================================================================
--- nutch/trunk/conf/nutch-default.xml (original)
+++ nutch/trunk/conf/nutch-default.xml Sat May 10 12:39:11 2014
@@ -838,6 +838,28 @@
   </description>
 </property>
 
+<property>
+  <name>fetcher.bandwidth.target</name>
+  <value>-1</value>  
+  <description>Target bandwidth in kilobits per sec for each mapper instance. 
This is used to adjust the number of 
+  fetching threads automatically (up to fetcher.maxNum.threads). A value of -1 
deactivates the functionality, in which case
+  the number of fetching threads is fixed (see 
fetcher.threads.fetch).</description>
+</property>
+
+<property>
+  <name>fetcher.maxNum.threads</name>
+  <value>25</value>  
+  <description>Max number of fetch threads allowed when using 
fetcher.bandwidth.target. Defaults to fetcher.threads.fetch if unspecified or
+  set to a value lower than it. </description>
+</property>
+
+<property>
+  <name>fetcher.bandwidth.target.check.everyNSecs</name>
+  <value>30</value>  
+  <description>(EXPERT) Value in seconds which determines how frequently we 
should reassess the optimal number of fetch threads when using
+   fetcher.bandwidth.target. Defaults to 30 and must be at least 
1.</description>
+</property>
+
 <!-- moreindexingfilter plugin properties -->
 
 <property>

Modified: nutch/trunk/src/java/org/apache/nutch/fetcher/Fetcher.java
URL: 
http://svn.apache.org/viewvc/nutch/trunk/src/java/org/apache/nutch/fetcher/Fetcher.java?rev=1593694&r1=1593693&r2=1593694&view=diff
==============================================================================
--- nutch/trunk/src/java/org/apache/nutch/fetcher/Fetcher.java (original)
+++ nutch/trunk/src/java/org/apache/nutch/fetcher/Fetcher.java Sat May 10 
12:39:11 2014
@@ -595,6 +595,8 @@ public class Fetcher extends Configured 
 
     private int outlinksDepthDivisor;
     private boolean skipTruncated;
+    
+    private boolean halted = false;
 
     public FetcherThread(Configuration conf) {
       this.setDaemon(true);                       // don't hang JVM on exit
@@ -637,6 +639,13 @@ public class Fetcher extends Configured 
       try {
 
         while (true) {
+          // check whether must be stopped
+          if (isHalted()) {
+            LOG.debug(getName() + " set to halted");
+            fit = null;
+            return;
+          }
+          
           fit = fetchQueues.getFetchItem();
           if (fit == null) {
             if (feeder.isAlive() || fetchQueues.getTotalSize() > 0) {
@@ -650,6 +659,7 @@ public class Fetcher extends Configured 
               continue;
             } else {
               // all done, finish this thread
+              LOG.info("Thread " + getName() + " has no more work available");
               return;
             }
           }
@@ -666,8 +676,8 @@ public class Fetcher extends Configured 
             redirecting = false;
             redirectCount = 0;
             do {
-              if (LOG.isInfoEnabled()) {
-                LOG.info("fetching " + fit.url + " (queue crawl delay=" + 
+              if (LOG.isDebugEnabled()) {
+                LOG.debug("fetching " + fit.url + " (queue crawl delay=" + 
                          fetchQueues.getFetchItemQueue(fit.queueID).crawlDelay 
+ "ms)"); 
               }
               if (LOG.isDebugEnabled()) {
@@ -1099,6 +1109,14 @@ public class Fetcher extends Configured 
       return null;
     }
 
+    public synchronized void setHalted(boolean halted) {
+      this.halted = halted;
+    }
+
+    public synchronized boolean isHalted() {
+      return halted;
+    }
+    
   }
 
   public Fetcher() { super(null); }
@@ -1201,7 +1219,24 @@ public class Fetcher extends Configured 
     int throughputThresholdMaxRetries = 
getConf().getInt("fetcher.throughput.threshold.retries", 5);
     if (LOG.isInfoEnabled()) { LOG.info("Fetcher: throughput threshold 
retries: " + throughputThresholdMaxRetries); }
     long throughputThresholdTimeLimit = 
getConf().getLong("fetcher.throughput.threshold.check.after", -1);
-
+    
+    int targetBandwidth = getConf().getInt("fetcher.bandwidth.target", -1) * 
1000;
+    int maxNumThreads = getConf().getInt("fetcher.maxNum.threads", 
threadCount);
+    if (maxNumThreads < threadCount){
+      LOG.info("fetcher.maxNum.threads can't be < than "+ threadCount + " : 
using "+threadCount+" instead");
+      maxNumThreads = threadCount;
+    }
+    int bandwidthTargetCheckEveryNSecs  = 
getConf().getInt("fetcher.bandwidth.target.check.everyNSecs", 30);
+    if (bandwidthTargetCheckEveryNSecs < 1){
+      LOG.info("fetcher.bandwidth.target.check.everyNSecs can't be < to 1 : 
using 1 instead");
+      bandwidthTargetCheckEveryNSecs = 1;
+    }
+    
+    int maxThreadsPerQueue = getConf().getInt("fetcher.threads.per.queue", 1);
+    
+    int bandwidthTargetCheckCounter = 0;
+    long bytesAtLastBWTCheck = 0l;
+    
     do {                                          // wait for threads to exit
       pagesLastSec = pages.get();
       bytesLastSec = (int)bytes.get();
@@ -1218,7 +1253,7 @@ public class Fetcher extends Configured 
       reportStatus(pagesLastSec, bytesLastSec);
 
       LOG.info("-activeThreads=" + activeThreads + ", spinWaiting=" + 
spinWaiting.get()
-          + ", fetchQueues.totalSize=" + fetchQueues.getTotalSize());
+          + ", fetchQueues.totalSize=" + fetchQueues.getTotalSize()+ ", 
fetchQueues.getQueueCount="+fetchQueues.getQueueCount());
 
       if (!feeder.isAlive() && fetchQueues.getTotalSize() < 5) {
         fetchQueues.dump();
@@ -1246,6 +1281,57 @@ public class Fetcher extends Configured 
           }
         }
       }
+      
+      // adjust the number of threads if a target bandwidth has been set
+      if (targetBandwidth>0) {
+        if (bandwidthTargetCheckCounter < bandwidthTargetCheckEveryNSecs) 
bandwidthTargetCheckCounter++;
+        else if (bandwidthTargetCheckCounter == 
bandwidthTargetCheckEveryNSecs){       
+          long bpsSinceLastCheck = ((bytes.get() - bytesAtLastBWTCheck) * 
8)/bandwidthTargetCheckEveryNSecs;
+
+          bytesAtLastBWTCheck = bytes.get();
+          bandwidthTargetCheckCounter = 0;
+
+          int averageBdwPerThread = 0;
+          if (activeThreads.get()>0)
+            averageBdwPerThread = 
Math.round(bpsSinceLastCheck/activeThreads.get());   
+
+          LOG.info("averageBdwPerThread : "+(averageBdwPerThread/1000) + " 
kbps");
+
+          if (bpsSinceLastCheck < targetBandwidth && averageBdwPerThread > 0){
+            // check whether it is worth doing e.g. more queues than threads
+
+            if ((fetchQueues.getQueueCount() * maxThreadsPerQueue) > 
activeThreads.get()){
+             
+              long remainingBdw = targetBandwidth - bpsSinceLastCheck;
+              int additionalThreads = 
Math.round(remainingBdw/averageBdwPerThread);
+              int availableThreads = maxNumThreads - activeThreads.get();
+
+              // determine the number of available threads (min between 
availableThreads and additionalThreads)
+              additionalThreads = (availableThreads < additionalThreads ? 
availableThreads:additionalThreads);
+              LOG.info("Has space for more threads ("+(bpsSinceLastCheck/1000) 
+" vs "+(targetBandwidth/1000)+" kbps) \t=> adding "+additionalThreads+" new 
threads");
+              // activate new threads
+              for (int i = 0; i < additionalThreads; i++) {
+                FetcherThread thread = new FetcherThread(getConf());
+                fetcherThreads.add(thread);
+                thread.start();
+              }
+            }
+          }
+          else if (bpsSinceLastCheck > targetBandwidth && averageBdwPerThread 
> 0){
+            // if the bandwidth we're using is greater then the expected 
bandwidth, we have to stop some threads
+            long excessBdw = bpsSinceLastCheck - targetBandwidth;
+            int excessThreads = Math.round(excessBdw/averageBdwPerThread);
+            LOG.info("Exceeding target bandwidth ("+bpsSinceLastCheck/1000 +" 
vs "+(targetBandwidth/1000)+" kbps). \t=> excessThreads = "+excessThreads);
+            // keep at least one
+            if (excessThreads >= fetcherThreads.size()) excessThreads = 0;
+            // de-activates threads
+            for (int i = 0; i < excessThreads; i++) {
+              FetcherThread thread = fetcherThreads.removeLast();
+              thread.setHalted(true);
+            }
+          }
+        }
+      }
 
       // check timelimit
       if (!feeder.isAlive()) {


Reply via email to