Author: lewismc
Date: Fri May  8 04:25:05 2015
New Revision: 1678281

URL: http://svn.apache.org/r1678281
Log:
NUTCH-1934 Refactor Fetcher in trunk

Added:
    nutch/trunk/src/java/org/apache/nutch/fetcher/FetchItem.java
    nutch/trunk/src/java/org/apache/nutch/fetcher/FetchItemQueue.java
    nutch/trunk/src/java/org/apache/nutch/fetcher/FetchItemQueues.java
    nutch/trunk/src/java/org/apache/nutch/fetcher/FetcherThread.java
    nutch/trunk/src/java/org/apache/nutch/fetcher/QueueFeeder.java
Modified:
    nutch/trunk/CHANGES.txt
    nutch/trunk/src/java/org/apache/nutch/fetcher/Fetcher.java

Modified: nutch/trunk/CHANGES.txt
URL: 
http://svn.apache.org/viewvc/nutch/trunk/CHANGES.txt?rev=1678281&r1=1678280&r2=1678281&view=diff
==============================================================================
--- nutch/trunk/CHANGES.txt (original)
+++ nutch/trunk/CHANGES.txt Fri May  8 04:25:05 2015
@@ -2,6 +2,8 @@ Nutch Change Log
  
 Nutch Current Development 1.11-SNAPSHOT
 
+* NUTCH-1934 Refactor Fetcher in trunk (lewismc)
+
 * NUTCH-2004 ParseChecker does not handle redirects (mjoyce via lewismc)
 
 Nutch 1.10 Release - 29/04/2015 (dd/mm/yyyy)

Added: nutch/trunk/src/java/org/apache/nutch/fetcher/FetchItem.java
URL: 
http://svn.apache.org/viewvc/nutch/trunk/src/java/org/apache/nutch/fetcher/FetchItem.java?rev=1678281&view=auto
==============================================================================
--- nutch/trunk/src/java/org/apache/nutch/fetcher/FetchItem.java (added)
+++ nutch/trunk/src/java/org/apache/nutch/fetcher/FetchItem.java Fri May  8 
04:25:05 2015
@@ -0,0 +1,118 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nutch.fetcher;
+
+import java.net.InetAddress;
+import java.net.URL;
+import java.net.UnknownHostException;
+
+import org.apache.hadoop.io.Text;
+import org.apache.nutch.crawl.CrawlDatum;
+import org.apache.nutch.util.URLUtil;
+import org.slf4j.LoggerFactory;
+import org.slf4j.Logger;
+
+/**
+ * This class described the item to be fetched.
+ */
+public class FetchItem {
+
+  private static final Logger LOG = LoggerFactory.getLogger(FetchItem.class);
+
+  int outlinkDepth = 0;
+  String queueID;
+  Text url;
+  URL u;
+  CrawlDatum datum;
+
+  public FetchItem(Text url, URL u, CrawlDatum datum, String queueID) {
+    this(url, u, datum, queueID, 0);
+  }
+
+  public FetchItem(Text url, URL u, CrawlDatum datum, String queueID,
+      int outlinkDepth) {
+    this.url = url;
+    this.u = u;
+    this.datum = datum;
+    this.queueID = queueID;
+    this.outlinkDepth = outlinkDepth;
+  }
+
+  /**
+   * Create an item. Queue id will be created based on <code>queueMode</code>
+   * argument, either as a protocol + hostname pair, protocol + IP address
+   * pair or protocol+domain pair.
+   */
+  public static FetchItem create(Text url, CrawlDatum datum, String queueMode) 
{
+    return create(url, datum, queueMode, 0);
+  }
+
+  public static FetchItem create(Text url, CrawlDatum datum,
+      String queueMode, int outlinkDepth) {
+    String queueID;
+    URL u = null;
+    try {
+      u = new URL(url.toString());
+    } catch (Exception e) {
+      LOG.warn("Cannot parse url: " + url, e);
+      return null;
+    }
+    final String proto = u.getProtocol().toLowerCase();
+    String key;
+    if (FetchItemQueues.QUEUE_MODE_IP.equalsIgnoreCase(queueMode)) {
+      try {
+        final InetAddress addr = InetAddress.getByName(u.getHost());
+        key = addr.getHostAddress();
+      } catch (final UnknownHostException e) {
+        // unable to resolve it, so don't fall back to host name
+        LOG.warn("Unable to resolve: " + u.getHost() + ", skipping.");
+        return null;
+      }
+    } else if (FetchItemQueues.QUEUE_MODE_DOMAIN.equalsIgnoreCase(queueMode)) {
+      key = URLUtil.getDomainName(u);
+      if (key == null) {
+        LOG.warn("Unknown domain for url: " + url
+            + ", using URL string as key");
+        key = u.toExternalForm();
+      }
+    } else {
+      key = u.getHost();
+      if (key == null) {
+        LOG.warn("Unknown host for url: " + url + ", using URL string as key");
+        key = u.toExternalForm();
+      }
+    }
+    queueID = proto + "://" + key.toLowerCase();
+    return new FetchItem(url, u, datum, queueID, outlinkDepth);
+  }
+
+  public CrawlDatum getDatum() {
+    return datum;
+  }
+
+  public String getQueueID() {
+    return queueID;
+  }
+
+  public Text getUrl() {
+    return url;
+  }
+
+  public URL getURL2() {
+    return u;
+  }
+}

Added: nutch/trunk/src/java/org/apache/nutch/fetcher/FetchItemQueue.java
URL: 
http://svn.apache.org/viewvc/nutch/trunk/src/java/org/apache/nutch/fetcher/FetchItemQueue.java?rev=1678281&view=auto
==============================================================================
--- nutch/trunk/src/java/org/apache/nutch/fetcher/FetchItemQueue.java (added)
+++ nutch/trunk/src/java/org/apache/nutch/fetcher/FetchItemQueue.java Fri May  
8 04:25:05 2015
@@ -0,0 +1,139 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nutch.fetcher;
+
+import java.util.Collections;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.concurrent.atomic.AtomicLong;
+
+import org.apache.hadoop.conf.Configuration;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * This class handles FetchItems which come from the same host ID (be it a
+ * proto/hostname or proto/IP pair). It also keeps track of requests in
+ * progress and elapsed time between requests.
+ */
+public class FetchItemQueue {
+  
+  private static final Logger LOG = 
LoggerFactory.getLogger(FetchItemQueues.class);
+
+  List<FetchItem> queue = Collections
+      .synchronizedList(new LinkedList<FetchItem>());
+  AtomicInteger inProgress = new AtomicInteger();
+  AtomicLong nextFetchTime = new AtomicLong();
+  AtomicInteger exceptionCounter = new AtomicInteger();
+  long crawlDelay;
+  long minCrawlDelay;
+  int maxThreads;
+  Configuration conf;
+
+  public FetchItemQueue(Configuration conf, int maxThreads, long crawlDelay,
+      long minCrawlDelay) {
+    this.conf = conf;
+    this.maxThreads = maxThreads;
+    this.crawlDelay = crawlDelay;
+    this.minCrawlDelay = minCrawlDelay;
+    // ready to start
+    setEndTime(System.currentTimeMillis() - crawlDelay);
+  }
+
+  public synchronized int emptyQueue() {
+    int presize = queue.size();
+    queue.clear();
+    return presize;
+  }
+
+  public int getQueueSize() {
+    return queue.size();
+  }
+
+  public int getInProgressSize() {
+    return inProgress.get();
+  }
+
+  public int incrementExceptionCounter() {
+    return exceptionCounter.incrementAndGet();
+  }
+
+  public void finishFetchItem(FetchItem it, boolean asap) {
+    if (it != null) {
+      inProgress.decrementAndGet();
+      setEndTime(System.currentTimeMillis(), asap);
+    }
+  }
+
+  public void addFetchItem(FetchItem it) {
+    if (it == null)
+      return;
+    queue.add(it);
+  }
+
+  public void addInProgressFetchItem(FetchItem it) {
+    if (it == null)
+      return;
+    inProgress.incrementAndGet();
+  }
+
+  public FetchItem getFetchItem() {
+    if (inProgress.get() >= maxThreads)
+      return null;
+    long now = System.currentTimeMillis();
+    if (nextFetchTime.get() > now)
+      return null;
+    FetchItem it = null;
+    if (queue.size() == 0)
+      return null;
+    try {
+      it = queue.remove(0);
+      inProgress.incrementAndGet();
+    } catch (Exception e) {
+      LOG.error(
+          "Cannot remove FetchItem from queue or cannot add it to inProgress 
queue",
+          e);
+    }
+    return it;
+  }
+
+  public synchronized void dump() {
+    LOG.info("  maxThreads    = " + maxThreads);
+    LOG.info("  inProgress    = " + inProgress.get());
+    LOG.info("  crawlDelay    = " + crawlDelay);
+    LOG.info("  minCrawlDelay = " + minCrawlDelay);
+    LOG.info("  nextFetchTime = " + nextFetchTime.get());
+    LOG.info("  now           = " + System.currentTimeMillis());
+    for (int i = 0; i < queue.size(); i++) {
+      FetchItem it = queue.get(i);
+      LOG.info("  " + i + ". " + it.url);
+    }
+  }
+
+  private void setEndTime(long endTime) {
+    setEndTime(endTime, false);
+  }
+
+  private void setEndTime(long endTime, boolean asap) {
+    if (!asap)
+      nextFetchTime.set(endTime
+          + (maxThreads > 1 ? minCrawlDelay : crawlDelay));
+    else
+      nextFetchTime.set(endTime);
+  }
+}

Added: nutch/trunk/src/java/org/apache/nutch/fetcher/FetchItemQueues.java
URL: 
http://svn.apache.org/viewvc/nutch/trunk/src/java/org/apache/nutch/fetcher/FetchItemQueues.java?rev=1678281&view=auto
==============================================================================
--- nutch/trunk/src/java/org/apache/nutch/fetcher/FetchItemQueues.java (added)
+++ nutch/trunk/src/java/org/apache/nutch/fetcher/FetchItemQueues.java Fri May  
8 04:25:05 2015
@@ -0,0 +1,212 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nutch.fetcher;
+
+import java.util.HashMap;
+import java.util.Iterator;
+import java.util.Map;
+import java.util.concurrent.atomic.AtomicInteger;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.io.Text;
+import org.apache.nutch.crawl.CrawlDatum;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Convenience class - a collection of queues that keeps track of the total
+ * number of items, and provides items eligible for fetching from any queue.
+ */
+public class FetchItemQueues {
+
+  private static final Logger LOG = 
LoggerFactory.getLogger(FetchItemQueues.class);
+  
+  public static final String DEFAULT_ID = "default";
+  Map<String, FetchItemQueue> queues = new HashMap<String, FetchItemQueue>();
+  AtomicInteger totalSize = new AtomicInteger(0);
+  int maxThreads;
+  long crawlDelay;
+  long minCrawlDelay;
+  long timelimit = -1;
+  int maxExceptionsPerQueue = -1;
+  Configuration conf;
+
+  public static final String QUEUE_MODE_HOST = "byHost";
+  public static final String QUEUE_MODE_DOMAIN = "byDomain";
+  public static final String QUEUE_MODE_IP = "byIP";
+
+  String queueMode;
+
+  public FetchItemQueues(Configuration conf) {
+    this.conf = conf;
+    this.maxThreads = conf.getInt("fetcher.threads.per.queue", 1);
+    queueMode = conf.get("fetcher.queue.mode", QUEUE_MODE_HOST);
+    // check that the mode is known
+    if (!queueMode.equals(QUEUE_MODE_IP)
+        && !queueMode.equals(QUEUE_MODE_DOMAIN)
+        && !queueMode.equals(QUEUE_MODE_HOST)) {
+      LOG.error("Unknown partition mode : " + queueMode
+          + " - forcing to byHost");
+      queueMode = QUEUE_MODE_HOST;
+    }
+    LOG.info("Using queue mode : " + queueMode);
+
+    this.crawlDelay = (long) (conf.getFloat("fetcher.server.delay", 1.0f) * 
1000);
+    this.minCrawlDelay = (long) (conf.getFloat("fetcher.server.min.delay",
+        0.0f) * 1000);
+    this.timelimit = conf.getLong("fetcher.timelimit", -1);
+    this.maxExceptionsPerQueue = conf.getInt(
+        "fetcher.max.exceptions.per.queue", -1);
+  }
+
+  public int getTotalSize() {
+    return totalSize.get();
+  }
+
+  public int getQueueCount() {
+    return queues.size();
+  }
+
+  public void addFetchItem(Text url, CrawlDatum datum) {
+    FetchItem it = FetchItem.create(url, datum, queueMode);
+    if (it != null)
+      addFetchItem(it);
+  }
+
+  public synchronized void addFetchItem(FetchItem it) {
+    FetchItemQueue fiq = getFetchItemQueue(it.queueID);
+    fiq.addFetchItem(it);
+    totalSize.incrementAndGet();
+  }
+
+  public void finishFetchItem(FetchItem it) {
+    finishFetchItem(it, false);
+  }
+
+  public void finishFetchItem(FetchItem it, boolean asap) {
+    FetchItemQueue fiq = queues.get(it.queueID);
+    if (fiq == null) {
+      LOG.warn("Attempting to finish item from unknown queue: " + it);
+      return;
+    }
+    fiq.finishFetchItem(it, asap);
+  }
+
+  public synchronized FetchItemQueue getFetchItemQueue(String id) {
+    FetchItemQueue fiq = queues.get(id);
+    if (fiq == null) {
+      // initialize queue
+      fiq = new FetchItemQueue(conf, maxThreads, crawlDelay, minCrawlDelay);
+      queues.put(id, fiq);
+    }
+    return fiq;
+  }
+
+  public synchronized FetchItem getFetchItem() {
+    Iterator<Map.Entry<String, FetchItemQueue>> it = queues.entrySet()
+        .iterator();
+    while (it.hasNext()) {
+      FetchItemQueue fiq = it.next().getValue();
+      // reap empty queues
+      if (fiq.getQueueSize() == 0 && fiq.getInProgressSize() == 0) {
+        it.remove();
+        continue;
+      }
+      FetchItem fit = fiq.getFetchItem();
+      if (fit != null) {
+        totalSize.decrementAndGet();
+        return fit;
+      }
+    }
+    return null;
+  }
+
+  // called only once the feeder has stopped
+  public synchronized int checkTimelimit() {
+    int count = 0;
+
+    if (System.currentTimeMillis() >= timelimit && timelimit != -1) {
+      // emptying the queues
+      count = emptyQueues();
+
+      // there might also be a case where totalsize !=0 but number of queues
+      // == 0
+      // in which case we simply force it to 0 to avoid blocking
+      if (totalSize.get() != 0 && queues.size() == 0)
+        totalSize.set(0);
+    }
+    return count;
+  }
+
+  // empties the queues (used by timebomb and throughput threshold)
+  public synchronized int emptyQueues() {
+    int count = 0;
+
+    for (String id : queues.keySet()) {
+      FetchItemQueue fiq = queues.get(id);
+      if (fiq.getQueueSize() == 0)
+        continue;
+      LOG.info("* queue: " + id + " >> dropping! ");
+      int deleted = fiq.emptyQueue();
+      for (int i = 0; i < deleted; i++) {
+        totalSize.decrementAndGet();
+      }
+      count += deleted;
+    }
+
+    return count;
+  }
+
+  /**
+   * Increment the exception counter of a queue in case of an exception e.g.
+   * timeout; when higher than a given threshold simply empty the queue.
+   * 
+   * @param queueid
+   * @return number of purged items
+   */
+  public synchronized int checkExceptionThreshold(String queueid) {
+    FetchItemQueue fiq = queues.get(queueid);
+    if (fiq == null) {
+      return 0;
+    }
+    if (fiq.getQueueSize() == 0) {
+      return 0;
+    }
+    int excCount = fiq.incrementExceptionCounter();
+    if (maxExceptionsPerQueue != -1 && excCount >= maxExceptionsPerQueue) {
+      // too many exceptions for items in this queue - purge it
+      int deleted = fiq.emptyQueue();
+      LOG.info("* queue: " + queueid + " >> removed " + deleted
+          + " URLs from queue because " + excCount + " exceptions occurred");
+      for (int i = 0; i < deleted; i++) {
+        totalSize.decrementAndGet();
+      }
+      return deleted;
+    }
+    return 0;
+  }
+
+  public synchronized void dump() {
+    for (String id : queues.keySet()) {
+      FetchItemQueue fiq = queues.get(id);
+      if (fiq.getQueueSize() == 0)
+        continue;
+      LOG.info("* queue: " + id);
+      fiq.dump();
+    }
+  }
+}

Modified: nutch/trunk/src/java/org/apache/nutch/fetcher/Fetcher.java
URL: 
http://svn.apache.org/viewvc/nutch/trunk/src/java/org/apache/nutch/fetcher/Fetcher.java?rev=1678281&r1=1678280&r2=1678281&view=diff
==============================================================================
--- nutch/trunk/src/java/org/apache/nutch/fetcher/Fetcher.java (original)
+++ nutch/trunk/src/java/org/apache/nutch/fetcher/Fetcher.java Fri May  8 
04:25:05 2015
@@ -18,28 +18,14 @@ package org.apache.nutch.fetcher;
 
 import java.io.File;
 import java.io.IOException;
-import java.net.InetAddress;
-import java.net.MalformedURLException;
-import java.net.URL;
-import java.net.UnknownHostException;
 import java.text.SimpleDateFormat;
 import java.util.*;
-import java.util.Map.Entry;
 import java.util.concurrent.atomic.AtomicInteger;
 import java.util.concurrent.atomic.AtomicLong;
 
-
-
-
-
-
-
-
-
-
-// Slf4j Logging imports
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
+
 import org.apache.hadoop.io.*;
 import org.apache.hadoop.fs.*;
 import org.apache.hadoop.conf.*;
@@ -49,18 +35,10 @@ import org.apache.hadoop.util.Tool;
 import org.apache.hadoop.util.ToolRunner;
 import org.apache.nutch.crawl.CrawlDatum;
 import org.apache.nutch.crawl.NutchWritable;
-import org.apache.nutch.crawl.SignatureFactory;
-import org.apache.nutch.metadata.Metadata;
 import org.apache.nutch.metadata.Nutch;
-import org.apache.nutch.net.*;
 import org.apache.nutch.protocol.*;
-import org.apache.nutch.parse.*;
-import org.apache.nutch.scoring.ScoringFilterException;
-import org.apache.nutch.scoring.ScoringFilters;
 import org.apache.nutch.util.*;
 
-import crawlercommons.robots.BaseRobotRules;
-
 /**
  * A queue-based fetcher.
  * 
@@ -146,1058 +124,6 @@ public class Fetcher extends NutchTool i
 
   LinkedList<FetcherThread> fetcherThreads = new LinkedList<FetcherThread>();
 
-  /**
-   * This class described the item to be fetched.
-   */
-  private static class FetchItem {
-    int outlinkDepth = 0;
-    String queueID;
-    Text url;
-    URL u;
-    CrawlDatum datum;
-
-    public FetchItem(Text url, URL u, CrawlDatum datum, String queueID) {
-      this(url, u, datum, queueID, 0);
-    }
-
-    public FetchItem(Text url, URL u, CrawlDatum datum, String queueID,
-        int outlinkDepth) {
-      this.url = url;
-      this.u = u;
-      this.datum = datum;
-      this.queueID = queueID;
-      this.outlinkDepth = outlinkDepth;
-    }
-
-    /**
-     * Create an item. Queue id will be created based on <code>queueMode</code>
-     * argument, either as a protocol + hostname pair, protocol + IP address
-     * pair or protocol+domain pair.
-     */
-    public static FetchItem create(Text url, CrawlDatum datum, String 
queueMode) {
-      return create(url, datum, queueMode, 0);
-    }
-
-    public static FetchItem create(Text url, CrawlDatum datum,
-        String queueMode, int outlinkDepth) {
-      String queueID;
-      URL u = null;
-      try {
-        u = new URL(url.toString());
-      } catch (Exception e) {
-        LOG.warn("Cannot parse url: " + url, e);
-        return null;
-      }
-      final String proto = u.getProtocol().toLowerCase();
-      String key;
-      if (FetchItemQueues.QUEUE_MODE_IP.equalsIgnoreCase(queueMode)) {
-        try {
-          final InetAddress addr = InetAddress.getByName(u.getHost());
-          key = addr.getHostAddress();
-        } catch (final UnknownHostException e) {
-          // unable to resolve it, so don't fall back to host name
-          LOG.warn("Unable to resolve: " + u.getHost() + ", skipping.");
-          return null;
-        }
-      } else if 
(FetchItemQueues.QUEUE_MODE_DOMAIN.equalsIgnoreCase(queueMode)) {
-        key = URLUtil.getDomainName(u);
-        if (key == null) {
-          LOG.warn("Unknown domain for url: " + url
-              + ", using URL string as key");
-          key = u.toExternalForm();
-        }
-      } else {
-        key = u.getHost();
-        if (key == null) {
-          LOG.warn("Unknown host for url: " + url + ", using URL string as 
key");
-          key = u.toExternalForm();
-        }
-      }
-      queueID = proto + "://" + key.toLowerCase();
-      return new FetchItem(url, u, datum, queueID, outlinkDepth);
-    }
-
-    public CrawlDatum getDatum() {
-      return datum;
-    }
-
-    public String getQueueID() {
-      return queueID;
-    }
-
-    public Text getUrl() {
-      return url;
-    }
-
-    public URL getURL2() {
-      return u;
-    }
-  }
-
-  /**
-   * This class handles FetchItems which come from the same host ID (be it a
-   * proto/hostname or proto/IP pair). It also keeps track of requests in
-   * progress and elapsed time between requests.
-   */
-  private static class FetchItemQueue {
-    List<FetchItem> queue = Collections
-        .synchronizedList(new LinkedList<FetchItem>());
-    AtomicInteger inProgress = new AtomicInteger();
-    AtomicLong nextFetchTime = new AtomicLong();
-    AtomicInteger exceptionCounter = new AtomicInteger();
-    long crawlDelay;
-    long minCrawlDelay;
-    int maxThreads;
-    Configuration conf;
-
-    public FetchItemQueue(Configuration conf, int maxThreads, long crawlDelay,
-        long minCrawlDelay) {
-      this.conf = conf;
-      this.maxThreads = maxThreads;
-      this.crawlDelay = crawlDelay;
-      this.minCrawlDelay = minCrawlDelay;
-      // ready to start
-      setEndTime(System.currentTimeMillis() - crawlDelay);
-    }
-
-    public synchronized int emptyQueue() {
-      int presize = queue.size();
-      queue.clear();
-      return presize;
-    }
-
-    public int getQueueSize() {
-      return queue.size();
-    }
-
-    public int getInProgressSize() {
-      return inProgress.get();
-    }
-
-    public int incrementExceptionCounter() {
-      return exceptionCounter.incrementAndGet();
-    }
-
-    public void finishFetchItem(FetchItem it, boolean asap) {
-      if (it != null) {
-        inProgress.decrementAndGet();
-        setEndTime(System.currentTimeMillis(), asap);
-      }
-    }
-
-    public void addFetchItem(FetchItem it) {
-      if (it == null)
-        return;
-      queue.add(it);
-    }
-
-    public void addInProgressFetchItem(FetchItem it) {
-      if (it == null)
-        return;
-      inProgress.incrementAndGet();
-    }
-
-    public FetchItem getFetchItem() {
-      if (inProgress.get() >= maxThreads)
-        return null;
-      long now = System.currentTimeMillis();
-      if (nextFetchTime.get() > now)
-        return null;
-      FetchItem it = null;
-      if (queue.size() == 0)
-        return null;
-      try {
-        it = queue.remove(0);
-        inProgress.incrementAndGet();
-      } catch (Exception e) {
-        LOG.error(
-            "Cannot remove FetchItem from queue or cannot add it to inProgress 
queue",
-            e);
-      }
-      return it;
-    }
-
-    public synchronized void dump() {
-      LOG.info("  maxThreads    = " + maxThreads);
-      LOG.info("  inProgress    = " + inProgress.get());
-      LOG.info("  crawlDelay    = " + crawlDelay);
-      LOG.info("  minCrawlDelay = " + minCrawlDelay);
-      LOG.info("  nextFetchTime = " + nextFetchTime.get());
-      LOG.info("  now           = " + System.currentTimeMillis());
-      for (int i = 0; i < queue.size(); i++) {
-        FetchItem it = queue.get(i);
-        LOG.info("  " + i + ". " + it.url);
-      }
-    }
-
-    private void setEndTime(long endTime) {
-      setEndTime(endTime, false);
-    }
-
-    private void setEndTime(long endTime, boolean asap) {
-      if (!asap)
-        nextFetchTime.set(endTime
-            + (maxThreads > 1 ? minCrawlDelay : crawlDelay));
-      else
-        nextFetchTime.set(endTime);
-    }
-  }
-
-  /**
-   * Convenience class - a collection of queues that keeps track of the total
-   * number of items, and provides items eligible for fetching from any queue.
-   */
-  private static class FetchItemQueues {
-    public static final String DEFAULT_ID = "default";
-    Map<String, FetchItemQueue> queues = new HashMap<String, FetchItemQueue>();
-    AtomicInteger totalSize = new AtomicInteger(0);
-    int maxThreads;
-    long crawlDelay;
-    long minCrawlDelay;
-    long timelimit = -1;
-    int maxExceptionsPerQueue = -1;
-    Configuration conf;
-
-    public static final String QUEUE_MODE_HOST = "byHost";
-    public static final String QUEUE_MODE_DOMAIN = "byDomain";
-    public static final String QUEUE_MODE_IP = "byIP";
-
-    String queueMode;
-
-    public FetchItemQueues(Configuration conf) {
-      this.conf = conf;
-      this.maxThreads = conf.getInt("fetcher.threads.per.queue", 1);
-      queueMode = conf.get("fetcher.queue.mode", QUEUE_MODE_HOST);
-      // check that the mode is known
-      if (!queueMode.equals(QUEUE_MODE_IP)
-          && !queueMode.equals(QUEUE_MODE_DOMAIN)
-          && !queueMode.equals(QUEUE_MODE_HOST)) {
-        LOG.error("Unknown partition mode : " + queueMode
-            + " - forcing to byHost");
-        queueMode = QUEUE_MODE_HOST;
-      }
-      LOG.info("Using queue mode : " + queueMode);
-
-      this.crawlDelay = (long) (conf.getFloat("fetcher.server.delay", 1.0f) * 
1000);
-      this.minCrawlDelay = (long) (conf.getFloat("fetcher.server.min.delay",
-          0.0f) * 1000);
-      this.timelimit = conf.getLong("fetcher.timelimit", -1);
-      this.maxExceptionsPerQueue = conf.getInt(
-          "fetcher.max.exceptions.per.queue", -1);
-    }
-
-    public int getTotalSize() {
-      return totalSize.get();
-    }
-
-    public int getQueueCount() {
-      return queues.size();
-    }
-
-    public void addFetchItem(Text url, CrawlDatum datum) {
-      FetchItem it = FetchItem.create(url, datum, queueMode);
-      if (it != null)
-        addFetchItem(it);
-    }
-
-    public synchronized void addFetchItem(FetchItem it) {
-      FetchItemQueue fiq = getFetchItemQueue(it.queueID);
-      fiq.addFetchItem(it);
-      totalSize.incrementAndGet();
-    }
-
-    public void finishFetchItem(FetchItem it) {
-      finishFetchItem(it, false);
-    }
-
-    public void finishFetchItem(FetchItem it, boolean asap) {
-      FetchItemQueue fiq = queues.get(it.queueID);
-      if (fiq == null) {
-        LOG.warn("Attempting to finish item from unknown queue: " + it);
-        return;
-      }
-      fiq.finishFetchItem(it, asap);
-    }
-
-    public synchronized FetchItemQueue getFetchItemQueue(String id) {
-      FetchItemQueue fiq = queues.get(id);
-      if (fiq == null) {
-        // initialize queue
-        fiq = new FetchItemQueue(conf, maxThreads, crawlDelay, minCrawlDelay);
-        queues.put(id, fiq);
-      }
-      return fiq;
-    }
-
-    public synchronized FetchItem getFetchItem() {
-      Iterator<Map.Entry<String, FetchItemQueue>> it = queues.entrySet()
-          .iterator();
-      while (it.hasNext()) {
-        FetchItemQueue fiq = it.next().getValue();
-        // reap empty queues
-        if (fiq.getQueueSize() == 0 && fiq.getInProgressSize() == 0) {
-          it.remove();
-          continue;
-        }
-        FetchItem fit = fiq.getFetchItem();
-        if (fit != null) {
-          totalSize.decrementAndGet();
-          return fit;
-        }
-      }
-      return null;
-    }
-
-    // called only once the feeder has stopped
-    public synchronized int checkTimelimit() {
-      int count = 0;
-
-      if (System.currentTimeMillis() >= timelimit && timelimit != -1) {
-        // emptying the queues
-        count = emptyQueues();
-
-        // there might also be a case where totalsize !=0 but number of queues
-        // == 0
-        // in which case we simply force it to 0 to avoid blocking
-        if (totalSize.get() != 0 && queues.size() == 0)
-          totalSize.set(0);
-      }
-      return count;
-    }
-
-    // empties the queues (used by timebomb and throughput threshold)
-    public synchronized int emptyQueues() {
-      int count = 0;
-
-      for (String id : queues.keySet()) {
-        FetchItemQueue fiq = queues.get(id);
-        if (fiq.getQueueSize() == 0)
-          continue;
-        LOG.info("* queue: " + id + " >> dropping! ");
-        int deleted = fiq.emptyQueue();
-        for (int i = 0; i < deleted; i++) {
-          totalSize.decrementAndGet();
-        }
-        count += deleted;
-      }
-
-      return count;
-    }
-
-    /**
-     * Increment the exception counter of a queue in case of an exception e.g.
-     * timeout; when higher than a given threshold simply empty the queue.
-     * 
-     * @param queueid
-     * @return number of purged items
-     */
-    public synchronized int checkExceptionThreshold(String queueid) {
-      FetchItemQueue fiq = queues.get(queueid);
-      if (fiq == null) {
-        return 0;
-      }
-      if (fiq.getQueueSize() == 0) {
-        return 0;
-      }
-      int excCount = fiq.incrementExceptionCounter();
-      if (maxExceptionsPerQueue != -1 && excCount >= maxExceptionsPerQueue) {
-        // too many exceptions for items in this queue - purge it
-        int deleted = fiq.emptyQueue();
-        LOG.info("* queue: " + queueid + " >> removed " + deleted
-            + " URLs from queue because " + excCount + " exceptions occurred");
-        for (int i = 0; i < deleted; i++) {
-          totalSize.decrementAndGet();
-        }
-        return deleted;
-      }
-      return 0;
-    }
-
-    public synchronized void dump() {
-      for (String id : queues.keySet()) {
-        FetchItemQueue fiq = queues.get(id);
-        if (fiq.getQueueSize() == 0)
-          continue;
-        LOG.info("* queue: " + id);
-        fiq.dump();
-      }
-    }
-  }
-
-  /**
-   * This class feeds the queues with input items, and re-fills them as items
-   * are consumed by FetcherThread-s.
-   */
-  private static class QueueFeeder extends Thread {
-    private RecordReader<Text, CrawlDatum> reader;
-    private FetchItemQueues queues;
-    private int size;
-    private long timelimit = -1;
-
-    public QueueFeeder(RecordReader<Text, CrawlDatum> reader,
-        FetchItemQueues queues, int size) {
-      this.reader = reader;
-      this.queues = queues;
-      this.size = size;
-      this.setDaemon(true);
-      this.setName("QueueFeeder");
-    }
-
-    public void setTimeLimit(long tl) {
-      timelimit = tl;
-    }
-
-    public void run() {
-      boolean hasMore = true;
-      int cnt = 0;
-      int timelimitcount = 0;
-      while (hasMore) {
-        if (System.currentTimeMillis() >= timelimit && timelimit != -1) {
-          // enough .. lets' simply
-          // read all the entries from the input without processing them
-          try {
-            Text url = new Text();
-            CrawlDatum datum = new CrawlDatum();
-            hasMore = reader.next(url, datum);
-            timelimitcount++;
-          } catch (IOException e) {
-            LOG.error("QueueFeeder error reading input, record " + cnt, e);
-            return;
-          }
-          continue;
-        }
-        int feed = size - queues.getTotalSize();
-        if (feed <= 0) {
-          // queues are full - spin-wait until they have some free space
-          try {
-            Thread.sleep(1000);
-          } catch (Exception e) {
-          }
-          ;
-          continue;
-        } else {
-          LOG.debug("-feeding " + feed + " input urls ...");
-          while (feed > 0 && hasMore) {
-            try {
-              Text url = new Text();
-              CrawlDatum datum = new CrawlDatum();
-              hasMore = reader.next(url, datum);
-              if (hasMore) {
-                queues.addFetchItem(url, datum);
-                cnt++;
-                feed--;
-              }
-            } catch (IOException e) {
-              LOG.error("QueueFeeder error reading input, record " + cnt, e);
-              return;
-            }
-          }
-        }
-      }
-      LOG.info("QueueFeeder finished: total " + cnt
-          + " records + hit by time limit :" + timelimitcount);
-    }
-  }
-
-  /**
-   * This class picks items from queues and fetches the pages.
-   */
-  private class FetcherThread extends Thread {
-    private Configuration conf;
-    private URLFilters urlFilters;
-    private ScoringFilters scfilters;
-    private ParseUtil parseUtil;
-    private URLNormalizers normalizers;
-    private ProtocolFactory protocolFactory;
-    private long maxCrawlDelay;
-    private String queueMode;
-    private int maxRedirect;
-    private String reprUrl;
-    private boolean redirecting;
-    private int redirectCount;
-    private boolean ignoreExternalLinks;
-
-    // Used by fetcher.follow.outlinks.depth in parse
-    private int maxOutlinksPerPage;
-    private final int maxOutlinks;
-    private final int interval;
-    private int maxOutlinkDepth;
-    private int maxOutlinkDepthNumLinks;
-    private boolean outlinksIgnoreExternal;
-
-    private int outlinksDepthDivisor;
-    private boolean skipTruncated;
-
-    private boolean halted = false;
-
-    public FetcherThread(Configuration conf) {
-      this.setDaemon(true); // don't hang JVM on exit
-      this.setName("FetcherThread"); // use an informative name
-      this.conf = conf;
-      this.urlFilters = new URLFilters(conf);
-      this.scfilters = new ScoringFilters(conf);
-      this.parseUtil = new ParseUtil(conf);
-      this.skipTruncated = conf.getBoolean(ParseSegment.SKIP_TRUNCATED, true);
-      this.protocolFactory = new ProtocolFactory(conf);
-      this.normalizers = new URLNormalizers(conf, 
URLNormalizers.SCOPE_FETCHER);
-      this.maxCrawlDelay = conf.getInt("fetcher.max.crawl.delay", 30) * 1000;
-      queueMode = conf.get("fetcher.queue.mode",
-          FetchItemQueues.QUEUE_MODE_HOST);
-      // check that the mode is known
-      if (!queueMode.equals(FetchItemQueues.QUEUE_MODE_IP)
-          && !queueMode.equals(FetchItemQueues.QUEUE_MODE_DOMAIN)
-          && !queueMode.equals(FetchItemQueues.QUEUE_MODE_HOST)) {
-        LOG.error("Unknown partition mode : " + queueMode
-            + " - forcing to byHost");
-        queueMode = FetchItemQueues.QUEUE_MODE_HOST;
-      }
-      LOG.info("Using queue mode : " + queueMode);
-      this.maxRedirect = conf.getInt("http.redirect.max", 3);
-      this.ignoreExternalLinks = conf.getBoolean("db.ignore.external.links",
-          false);
-
-      maxOutlinksPerPage = conf.getInt("db.max.outlinks.per.page", 100);
-      maxOutlinks = (maxOutlinksPerPage < 0) ? Integer.MAX_VALUE
-          : maxOutlinksPerPage;
-      interval = conf.getInt("db.fetch.interval.default", 2592000);
-      ignoreExternalLinks = conf.getBoolean("db.ignore.external.links", false);
-      maxOutlinkDepth = conf.getInt("fetcher.follow.outlinks.depth", -1);
-      outlinksIgnoreExternal = conf.getBoolean(
-          "fetcher.follow.outlinks.ignore.external", false);
-      maxOutlinkDepthNumLinks = conf.getInt(
-          "fetcher.follow.outlinks.num.links", 4);
-      outlinksDepthDivisor = conf.getInt(
-          "fetcher.follow.outlinks.depth.divisor", 2);
-    }
-
-    @SuppressWarnings("fallthrough")
-    public void run() {
-      activeThreads.incrementAndGet(); // count threads
-
-      FetchItem fit = null;
-      try {
-
-        while (true) {
-          // check whether must be stopped
-          if (isHalted()) {
-            LOG.debug(getName() + " set to halted");
-            fit = null;
-            return;
-          }
-
-          fit = fetchQueues.getFetchItem();
-          if (fit == null) {
-            if (feeder.isAlive() || fetchQueues.getTotalSize() > 0) {
-              LOG.debug(getName() + " spin-waiting ...");
-              // spin-wait.
-              spinWaiting.incrementAndGet();
-              try {
-                Thread.sleep(500);
-              } catch (Exception e) {
-              }
-              spinWaiting.decrementAndGet();
-              continue;
-            } else {
-              // all done, finish this thread
-              LOG.info("Thread " + getName() + " has no more work available");
-              return;
-            }
-          }
-          lastRequestStart.set(System.currentTimeMillis());
-          Text reprUrlWritable = (Text) fit.datum.getMetaData().get(
-              Nutch.WRITABLE_REPR_URL_KEY);
-          if (reprUrlWritable == null) {
-            reprUrl = fit.url.toString();
-          } else {
-            reprUrl = reprUrlWritable.toString();
-          }
-          try {
-            // fetch the page
-            redirecting = false;
-            redirectCount = 0;
-            do {
-              if (LOG.isInfoEnabled()) {
-                LOG.info("fetching " + fit.url + " (queue crawl delay="
-                    + fetchQueues.getFetchItemQueue(fit.queueID).crawlDelay
-                    + "ms)");
-              }
-              if (LOG.isDebugEnabled()) {
-                LOG.debug("redirectCount=" + redirectCount);
-              }
-              redirecting = false;
-              Protocol protocol = this.protocolFactory.getProtocol(fit.url
-                  .toString());
-              BaseRobotRules rules = protocol.getRobotRules(fit.url, 
fit.datum);
-              if (!rules.isAllowed(fit.u.toString())) {
-                // unblock
-                fetchQueues.finishFetchItem(fit, true);
-                if (LOG.isDebugEnabled()) {
-                  LOG.debug("Denied by robots.txt: " + fit.url);
-                }
-                output(fit.url, fit.datum, null,
-                    ProtocolStatus.STATUS_ROBOTS_DENIED,
-                    CrawlDatum.STATUS_FETCH_GONE);
-                reporter.incrCounter("FetcherStatus", "robots_denied", 1);
-                continue;
-              }
-              if (rules.getCrawlDelay() > 0) {
-                if (rules.getCrawlDelay() > maxCrawlDelay && maxCrawlDelay >= 
0) {
-                  // unblock
-                  fetchQueues.finishFetchItem(fit, true);
-                  LOG.debug("Crawl-Delay for " + fit.url + " too long ("
-                      + rules.getCrawlDelay() + "), skipping");
-                  output(fit.url, fit.datum, null,
-                      ProtocolStatus.STATUS_ROBOTS_DENIED,
-                      CrawlDatum.STATUS_FETCH_GONE);
-                  reporter.incrCounter("FetcherStatus",
-                      "robots_denied_maxcrawldelay", 1);
-                  continue;
-                } else {
-                  FetchItemQueue fiq = fetchQueues
-                      .getFetchItemQueue(fit.queueID);
-                  fiq.crawlDelay = rules.getCrawlDelay();
-                  if (LOG.isDebugEnabled()) {
-                    LOG.info("Crawl delay for queue: " + fit.queueID
-                        + " is set to " + fiq.crawlDelay
-                        + " as per robots.txt. url: " + fit.url);
-                  }
-                }
-              }
-              ProtocolOutput output = protocol.getProtocolOutput(fit.url,
-                  fit.datum);
-              ProtocolStatus status = output.getStatus();
-              Content content = output.getContent();
-              ParseStatus pstatus = null;
-              // unblock queue
-              fetchQueues.finishFetchItem(fit);
-
-              String urlString = fit.url.toString();
-
-              reporter.incrCounter("FetcherStatus", status.getName(), 1);
-
-              switch (status.getCode()) {
-
-              case ProtocolStatus.WOULDBLOCK:
-                // retry ?
-                fetchQueues.addFetchItem(fit);
-                break;
-
-              case ProtocolStatus.SUCCESS: // got a page
-                pstatus = output(fit.url, fit.datum, content, status,
-                    CrawlDatum.STATUS_FETCH_SUCCESS, fit.outlinkDepth);
-                updateStatus(content.getContent().length);
-                if (pstatus != null && pstatus.isSuccess()
-                    && pstatus.getMinorCode() == ParseStatus.SUCCESS_REDIRECT) 
{
-                  String newUrl = pstatus.getMessage();
-                  int refreshTime = Integer.valueOf(pstatus.getArgs()[1]);
-                  Text redirUrl = handleRedirect(fit.url, fit.datum, urlString,
-                      newUrl, refreshTime < Fetcher.PERM_REFRESH_TIME,
-                      Fetcher.CONTENT_REDIR);
-                  if (redirUrl != null) {
-                    fit = queueRedirect(redirUrl, fit);
-                  }
-                }
-                break;
-
-              case ProtocolStatus.MOVED: // redirect
-              case ProtocolStatus.TEMP_MOVED:
-                int code;
-                boolean temp;
-                if (status.getCode() == ProtocolStatus.MOVED) {
-                  code = CrawlDatum.STATUS_FETCH_REDIR_PERM;
-                  temp = false;
-                } else {
-                  code = CrawlDatum.STATUS_FETCH_REDIR_TEMP;
-                  temp = true;
-                }
-                output(fit.url, fit.datum, content, status, code);
-                String newUrl = status.getMessage();
-                Text redirUrl = handleRedirect(fit.url, fit.datum, urlString,
-                    newUrl, temp, Fetcher.PROTOCOL_REDIR);
-                if (redirUrl != null) {
-                  fit = queueRedirect(redirUrl, fit);
-                } else {
-                  // stop redirecting
-                  redirecting = false;
-                }
-                break;
-
-              case ProtocolStatus.EXCEPTION:
-                logError(fit.url, status.getMessage());
-                int killedURLs = fetchQueues.checkExceptionThreshold(fit
-                    .getQueueID());
-                if (killedURLs != 0)
-                  reporter.incrCounter("FetcherStatus",
-                      "AboveExceptionThresholdInQueue", killedURLs);
-                /* FALLTHROUGH */
-              case ProtocolStatus.RETRY: // retry
-              case ProtocolStatus.BLOCKED:
-                output(fit.url, fit.datum, null, status,
-                    CrawlDatum.STATUS_FETCH_RETRY);
-                break;
-
-              case ProtocolStatus.GONE: // gone
-              case ProtocolStatus.NOTFOUND:
-              case ProtocolStatus.ACCESS_DENIED:
-              case ProtocolStatus.ROBOTS_DENIED:
-                output(fit.url, fit.datum, null, status,
-                    CrawlDatum.STATUS_FETCH_GONE);
-                break;
-
-              case ProtocolStatus.NOTMODIFIED:
-                output(fit.url, fit.datum, null, status,
-                    CrawlDatum.STATUS_FETCH_NOTMODIFIED);
-                break;
-
-              default:
-                if (LOG.isWarnEnabled()) {
-                  LOG.warn("Unknown ProtocolStatus: " + status.getCode());
-                }
-                output(fit.url, fit.datum, null, status,
-                    CrawlDatum.STATUS_FETCH_RETRY);
-              }
-
-              if (redirecting && redirectCount > maxRedirect) {
-                fetchQueues.finishFetchItem(fit);
-                if (LOG.isInfoEnabled()) {
-                  LOG.info(" - redirect count exceeded " + fit.url);
-                }
-                output(fit.url, fit.datum, null,
-                    ProtocolStatus.STATUS_REDIR_EXCEEDED,
-                    CrawlDatum.STATUS_FETCH_GONE);
-              }
-
-            } while (redirecting && (redirectCount <= maxRedirect));
-
-          } catch (Throwable t) { // unexpected exception
-            // unblock
-            fetchQueues.finishFetchItem(fit);
-            logError(fit.url, StringUtils.stringifyException(t));
-            output(fit.url, fit.datum, null, ProtocolStatus.STATUS_FAILED,
-                CrawlDatum.STATUS_FETCH_RETRY);
-          }
-        }
-
-      } catch (Throwable e) {
-        if (LOG.isErrorEnabled()) {
-          LOG.error("fetcher caught:" + e.toString());
-        }
-      } finally {
-        if (fit != null)
-          fetchQueues.finishFetchItem(fit);
-        activeThreads.decrementAndGet(); // count threads
-        LOG.info("-finishing thread " + getName() + ", activeThreads="
-            + activeThreads);
-      }
-    }
-
-    private Text handleRedirect(Text url, CrawlDatum datum, String urlString,
-        String newUrl, boolean temp, String redirType)
-        throws MalformedURLException, URLFilterException {
-      newUrl = normalizers.normalize(newUrl, URLNormalizers.SCOPE_FETCHER);
-      newUrl = urlFilters.filter(newUrl);
-
-      if (ignoreExternalLinks) {
-        try {
-          String origHost = new URL(urlString).getHost().toLowerCase();
-          String newHost = new URL(newUrl).getHost().toLowerCase();
-          if (!origHost.equals(newHost)) {
-            if (LOG.isDebugEnabled()) {
-              LOG.debug(" - ignoring redirect " + redirType + " from "
-                  + urlString + " to " + newUrl
-                  + " because external links are ignored");
-            }
-            return null;
-          }
-        } catch (MalformedURLException e) {
-        }
-      }
-
-      if (newUrl != null && !newUrl.equals(urlString)) {
-        reprUrl = URLUtil.chooseRepr(reprUrl, newUrl, temp);
-        url = new Text(newUrl);
-        if (maxRedirect > 0) {
-          redirecting = true;
-          redirectCount++;
-          if (LOG.isDebugEnabled()) {
-            LOG.debug(" - " + redirType + " redirect to " + url
-                + " (fetching now)");
-          }
-          return url;
-        } else {
-          CrawlDatum newDatum = new CrawlDatum(CrawlDatum.STATUS_LINKED,
-              datum.getFetchInterval(), datum.getScore());
-          // transfer existing metadata
-          newDatum.getMetaData().putAll(datum.getMetaData());
-          try {
-            scfilters.initialScore(url, newDatum);
-          } catch (ScoringFilterException e) {
-            e.printStackTrace();
-          }
-          if (reprUrl != null) {
-            newDatum.getMetaData().put(Nutch.WRITABLE_REPR_URL_KEY,
-                new Text(reprUrl));
-          }
-          output(url, newDatum, null, null, CrawlDatum.STATUS_LINKED);
-          if (LOG.isDebugEnabled()) {
-            LOG.debug(" - " + redirType + " redirect to " + url
-                + " (fetching later)");
-          }
-          return null;
-        }
-      } else {
-        if (LOG.isDebugEnabled()) {
-          LOG.debug(" - " + redirType + " redirect skipped: "
-              + (newUrl != null ? "to same url" : "filtered"));
-        }
-        return null;
-      }
-    }
-
-    private FetchItem queueRedirect(Text redirUrl, FetchItem fit)
-        throws ScoringFilterException {
-      CrawlDatum newDatum = new CrawlDatum(CrawlDatum.STATUS_DB_UNFETCHED,
-          fit.datum.getFetchInterval(), fit.datum.getScore());
-      // transfer all existing metadata to the redirect
-      newDatum.getMetaData().putAll(fit.datum.getMetaData());
-      scfilters.initialScore(redirUrl, newDatum);
-      if (reprUrl != null) {
-        newDatum.getMetaData().put(Nutch.WRITABLE_REPR_URL_KEY,
-            new Text(reprUrl));
-      }
-      fit = FetchItem.create(redirUrl, newDatum, queueMode);
-      if (fit != null) {
-        FetchItemQueue fiq = fetchQueues.getFetchItemQueue(fit.queueID);
-        fiq.addInProgressFetchItem(fit);
-      } else {
-        // stop redirecting
-        redirecting = false;
-        reporter.incrCounter("FetcherStatus", "FetchItem.notCreated.redirect",
-            1);
-      }
-      return fit;
-    }
-
-    private void logError(Text url, String message) {
-      if (LOG.isInfoEnabled()) {
-        LOG.info("fetch of " + url + " failed with: " + message);
-      }
-      errors.incrementAndGet();
-    }
-
-    private ParseStatus output(Text key, CrawlDatum datum, Content content,
-        ProtocolStatus pstatus, int status) {
-
-      return output(key, datum, content, pstatus, status, 0);
-    }
-
-    private ParseStatus output(Text key, CrawlDatum datum, Content content,
-        ProtocolStatus pstatus, int status, int outlinkDepth) {
-
-      datum.setStatus(status);
-      datum.setFetchTime(System.currentTimeMillis());
-      if (pstatus != null)
-        datum.getMetaData().put(Nutch.WRITABLE_PROTO_STATUS_KEY, pstatus);
-
-      ParseResult parseResult = null;
-      if (content != null) {
-        Metadata metadata = content.getMetadata();
-
-        // store the guessed content type in the crawldatum
-        if (content.getContentType() != null)
-          datum.getMetaData().put(new Text(Metadata.CONTENT_TYPE),
-              new Text(content.getContentType()));
-
-        // add segment to metadata
-        metadata.set(Nutch.SEGMENT_NAME_KEY, segmentName);
-        // add score to content metadata so that ParseSegment can pick it up.
-        try {
-          scfilters.passScoreBeforeParsing(key, datum, content);
-        } catch (Exception e) {
-          if (LOG.isWarnEnabled()) {
-            LOG.warn("Couldn't pass score, url " + key + " (" + e + ")");
-          }
-        }
-        /*
-         * Note: Fetcher will only follow meta-redirects coming from the
-         * original URL.
-         */
-        if (parsing && status == CrawlDatum.STATUS_FETCH_SUCCESS) {
-          if (!skipTruncated
-              || (skipTruncated && !ParseSegment.isTruncated(content))) {
-            try {
-              parseResult = this.parseUtil.parse(content);
-            } catch (Exception e) {
-              LOG.warn("Error parsing: " + key + ": "
-                  + StringUtils.stringifyException(e));
-            }
-          }
-
-          if (parseResult == null) {
-            byte[] signature = SignatureFactory.getSignature(getConf())
-                .calculate(content, new ParseStatus().getEmptyParse(conf));
-            datum.setSignature(signature);
-          }
-        }
-
-        /*
-         * Store status code in content So we can read this value during 
parsing
-         * (as a separate job) and decide to parse or not.
-         */
-        content.getMetadata().add(Nutch.FETCH_STATUS_KEY,
-            Integer.toString(status));
-      }
-
-      try {
-        output.collect(key, new NutchWritable(datum));
-        if (content != null && storingContent)
-          output.collect(key, new NutchWritable(content));
-        if (parseResult != null) {
-          for (Entry<Text, Parse> entry : parseResult) {
-            Text url = entry.getKey();
-            Parse parse = entry.getValue();
-            ParseStatus parseStatus = parse.getData().getStatus();
-            ParseData parseData = parse.getData();
-
-            if (!parseStatus.isSuccess()) {
-              LOG.warn("Error parsing: " + key + ": " + parseStatus);
-              parse = parseStatus.getEmptyParse(getConf());
-            }
-
-            // Calculate page signature. For non-parsing fetchers this will
-            // be done in ParseSegment
-            byte[] signature = SignatureFactory.getSignature(getConf())
-                .calculate(content, parse);
-            // Ensure segment name and score are in parseData metadata
-            parseData.getContentMeta().set(Nutch.SEGMENT_NAME_KEY, 
segmentName);
-            parseData.getContentMeta().set(Nutch.SIGNATURE_KEY,
-                StringUtil.toHexString(signature));
-            // Pass fetch time to content meta
-            parseData.getContentMeta().set(Nutch.FETCH_TIME_KEY,
-                Long.toString(datum.getFetchTime()));
-            if (url.equals(key))
-              datum.setSignature(signature);
-            try {
-              scfilters.passScoreAfterParsing(url, content, parse);
-            } catch (Exception e) {
-              if (LOG.isWarnEnabled()) {
-                LOG.warn("Couldn't pass score, url " + key + " (" + e + ")");
-              }
-            }
-
-            String fromHost;
-
-            // collect outlinks for subsequent db update
-            Outlink[] links = parseData.getOutlinks();
-            int outlinksToStore = Math.min(maxOutlinks, links.length);
-            if (ignoreExternalLinks) {
-              try {
-                fromHost = new URL(url.toString()).getHost().toLowerCase();
-              } catch (MalformedURLException e) {
-                fromHost = null;
-              }
-            } else {
-              fromHost = null;
-            }
-
-            int validCount = 0;
-
-            // Process all outlinks, normalize, filter and deduplicate
-            List<Outlink> outlinkList = new 
ArrayList<Outlink>(outlinksToStore);
-            HashSet<String> outlinks = new HashSet<String>(outlinksToStore);
-            for (int i = 0; i < links.length && validCount < outlinksToStore; 
i++) {
-              String toUrl = links[i].getToUrl();
-
-              toUrl = ParseOutputFormat.filterNormalize(url.toString(), toUrl,
-                  fromHost, ignoreExternalLinks, urlFilters, normalizers);
-              if (toUrl == null) {
-                continue;
-              }
-
-              validCount++;
-              links[i].setUrl(toUrl);
-              outlinkList.add(links[i]);
-              outlinks.add(toUrl);
-            }
-
-            // Only process depth N outlinks
-            if (maxOutlinkDepth > 0 && outlinkDepth < maxOutlinkDepth) {
-              reporter.incrCounter("FetcherOutlinks", "outlinks_detected",
-                  outlinks.size());
-
-              // Counter to limit num outlinks to follow per page
-              int outlinkCounter = 0;
-
-              // Calculate variable number of outlinks by depth using the
-              // divisor (outlinks = Math.floor(divisor / depth * num.links))
-              int maxOutlinksByDepth = (int) Math.floor(outlinksDepthDivisor
-                  / (outlinkDepth + 1) * maxOutlinkDepthNumLinks);
-
-              String followUrl;
-
-              // Walk over the outlinks and add as new FetchItem to the queues
-              Iterator<String> iter = outlinks.iterator();
-              while (iter.hasNext() && outlinkCounter < 
maxOutlinkDepthNumLinks) {
-                followUrl = iter.next();
-
-                // Check whether we'll follow external outlinks
-                if (outlinksIgnoreExternal) {
-                  if (!URLUtil.getHost(url.toString()).equals(
-                      URLUtil.getHost(followUrl))) {
-                    continue;
-                  }
-                }
-
-                reporter
-                    .incrCounter("FetcherOutlinks", "outlinks_following", 1);
-
-                // Create new FetchItem with depth incremented
-                FetchItem fit = FetchItem.create(new Text(followUrl),
-                    new CrawlDatum(CrawlDatum.STATUS_LINKED, interval),
-                    queueMode, outlinkDepth + 1);
-                fetchQueues.addFetchItem(fit);
-
-                outlinkCounter++;
-              }
-            }
-
-            // Overwrite the outlinks in ParseData with the normalized and
-            // filtered set
-            parseData.setOutlinks(outlinkList.toArray(new Outlink[outlinkList
-                .size()]));
-
-            output.collect(url, new NutchWritable(new ParseImpl(new ParseText(
-                parse.getText()), parseData, parse.isCanonical())));
-          }
-        }
-      } catch (IOException e) {
-        if (LOG.isErrorEnabled()) {
-          LOG.error("fetcher caught:" + e.toString());
-        }
-      }
-
-      // return parse status if it exits
-      if (parseResult != null && !parseResult.isEmpty()) {
-        Parse p = parseResult.get(content.getUrl());
-        if (p != null) {
-          reporter.incrCounter("ParserStatus", ParseStatus.majorCodes[p
-              .getData().getStatus().getMajorCode()], 1);
-          return p.getData().getStatus();
-        }
-      }
-      return null;
-    }
-
-    public synchronized void setHalted(boolean halted) {
-      this.halted = halted;
-    }
-
-    public synchronized boolean isHalted() {
-      return halted;
-    }
-
-  }
-
   public Fetcher() {
          super(null);
   }
@@ -1206,11 +132,6 @@ public class Fetcher extends NutchTool i
     super(conf);
   }
 
-  private void updateStatus(int bytesInPage) throws IOException {
-    pages.incrementAndGet();
-    bytes.addAndGet(bytesInPage);
-  }
-
   private void reportStatus(int pagesLastSec, int bytesLastSec)
       throws IOException {
     StringBuilder status = new StringBuilder();
@@ -1292,7 +213,9 @@ public class Fetcher extends NutchTool i
     getConf().setBoolean(Protocol.CHECK_ROBOTS, false);
 
     for (int i = 0; i < threadCount; i++) { // spawn threads
-      FetcherThread t = new FetcherThread(getConf());
+      FetcherThread t = new FetcherThread(getConf(), getActiveThreads(), 
fetchQueues, 
+          feeder, spinWaiting, lastRequestStart, reporter, activeThreads, 
segmentName,
+          parsing, output, storingContent, pages, bytes);
       fetcherThreads.add(t);
       t.start();
     }
@@ -1437,7 +360,9 @@ public class Fetcher extends NutchTool i
                   + additionalThreads + " new threads");
               // activate new threads
               for (int i = 0; i < additionalThreads; i++) {
-                FetcherThread thread = new FetcherThread(getConf());
+                FetcherThread thread = new FetcherThread(getConf(), 
getActiveThreads(), fetchQueues, 
+                    feeder, spinWaiting, lastRequestStart, reporter, errors, 
segmentName, parsing,
+                    output, storingContent, pages, bytes);
                 fetcherThreads.add(thread);
                 thread.start();
               }
@@ -1479,7 +404,7 @@ public class Fetcher extends NutchTool i
             FetcherThread thread = fetcherThreads.get(i);
             if (thread.isAlive()) {
               LOG.warn("Thread #" + i + " hung while processing "
-                  + thread.reprUrl);
+                  + thread.getReprUrl());
               if (LOG.isDebugEnabled()) {
                 StackTraceElement[] stack = thread.getStackTrace();
                 StringBuilder sb = new StringBuilder();
@@ -1626,6 +551,10 @@ public class Fetcher extends NutchTool i
     }
   }
 
+  private AtomicInteger getActiveThreads() {
+    return activeThreads;
+  }
+  
   @Override
   public Map<String, Object> run(Map<String, String> args, String crawlId) 
throws Exception {
 

Added: nutch/trunk/src/java/org/apache/nutch/fetcher/FetcherThread.java
URL: 
http://svn.apache.org/viewvc/nutch/trunk/src/java/org/apache/nutch/fetcher/FetcherThread.java?rev=1678281&view=auto
==============================================================================
--- nutch/trunk/src/java/org/apache/nutch/fetcher/FetcherThread.java (added)
+++ nutch/trunk/src/java/org/apache/nutch/fetcher/FetcherThread.java Fri May  8 
04:25:05 2015
@@ -0,0 +1,722 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nutch.fetcher;
+
+import java.io.IOException;
+import java.net.MalformedURLException;
+import java.net.URL;
+import java.util.ArrayList;
+import java.util.HashSet;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map.Entry;
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.concurrent.atomic.AtomicLong;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.mapred.OutputCollector;
+import org.apache.hadoop.mapred.Reporter;
+import org.apache.hadoop.util.StringUtils;
+import org.apache.nutch.crawl.CrawlDatum;
+import org.apache.nutch.crawl.NutchWritable;
+import org.apache.nutch.crawl.SignatureFactory;
+import org.apache.nutch.metadata.Metadata;
+import org.apache.nutch.metadata.Nutch;
+import org.apache.nutch.net.URLFilterException;
+import org.apache.nutch.net.URLFilters;
+import org.apache.nutch.net.URLNormalizers;
+import org.apache.nutch.parse.Outlink;
+import org.apache.nutch.parse.Parse;
+import org.apache.nutch.parse.ParseData;
+import org.apache.nutch.parse.ParseImpl;
+import org.apache.nutch.parse.ParseOutputFormat;
+import org.apache.nutch.parse.ParseResult;
+import org.apache.nutch.parse.ParseSegment;
+import org.apache.nutch.parse.ParseStatus;
+import org.apache.nutch.parse.ParseText;
+import org.apache.nutch.parse.ParseUtil;
+import org.apache.nutch.protocol.Content;
+import org.apache.nutch.protocol.Protocol;
+import org.apache.nutch.protocol.ProtocolFactory;
+import org.apache.nutch.protocol.ProtocolOutput;
+import org.apache.nutch.protocol.ProtocolStatus;
+import org.apache.nutch.scoring.ScoringFilterException;
+import org.apache.nutch.scoring.ScoringFilters;
+import org.apache.nutch.util.StringUtil;
+import org.apache.nutch.util.URLUtil;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import crawlercommons.robots.BaseRobotRules;
+
+/**
+ * This class picks items from queues and fetches the pages.
+ */
+public class FetcherThread extends Thread {
+  
+  private static final Logger LOG = 
LoggerFactory.getLogger(FetcherThread.class);
+
+  private Configuration conf;
+  private URLFilters urlFilters;
+  private ScoringFilters scfilters;
+  private ParseUtil parseUtil;
+  private URLNormalizers normalizers;
+  private ProtocolFactory protocolFactory;
+  private long maxCrawlDelay;
+  private String queueMode;
+  private int maxRedirect;
+  private String reprUrl;
+  private boolean redirecting;
+  private int redirectCount;
+  private boolean ignoreExternalLinks;
+
+  // Used by fetcher.follow.outlinks.depth in parse
+  private int maxOutlinksPerPage;
+  private final int maxOutlinks;
+  private final int interval;
+  private int maxOutlinkDepth;
+  private int maxOutlinkDepthNumLinks;
+  private boolean outlinksIgnoreExternal;
+
+  private int outlinksDepthDivisor;
+  private boolean skipTruncated;
+
+  private boolean halted = false;
+
+  private AtomicInteger activeThreads;
+
+  private Object fetchQueues;
+
+  private QueueFeeder feeder;
+
+  private Object spinWaiting;
+
+  private AtomicLong lastRequestStart;
+
+  private Reporter reporter;
+
+  private AtomicInteger errors;
+
+  private String segmentName;
+
+  private boolean parsing;
+
+  private OutputCollector<Text, NutchWritable> output;
+
+  private boolean storingContent;
+
+  private AtomicInteger pages;
+
+  private AtomicLong bytes;
+
+  public FetcherThread(Configuration conf, AtomicInteger activeThreads, 
FetchItemQueues fetchQueues, 
+      QueueFeeder feeder, AtomicInteger spinWaiting, AtomicLong 
lastRequestStart, Reporter reporter,
+      AtomicInteger errors, String segmentName, boolean parsing, 
OutputCollector<Text, NutchWritable> output,
+      boolean storingContent, AtomicInteger pages, AtomicLong bytes) {
+    this.setDaemon(true); // don't hang JVM on exit
+    this.setName("FetcherThread"); // use an informative name
+    this.conf = conf;
+    this.urlFilters = new URLFilters(conf);
+    this.scfilters = new ScoringFilters(conf);
+    this.parseUtil = new ParseUtil(conf);
+    this.skipTruncated = conf.getBoolean(ParseSegment.SKIP_TRUNCATED, true);
+    this.protocolFactory = new ProtocolFactory(conf);
+    this.normalizers = new URLNormalizers(conf, URLNormalizers.SCOPE_FETCHER);
+    this.maxCrawlDelay = conf.getInt("fetcher.max.crawl.delay", 30) * 1000;
+    this.activeThreads = activeThreads;
+    this.fetchQueues = fetchQueues;
+    this.feeder = feeder;
+    this.spinWaiting = spinWaiting;
+    this.lastRequestStart = lastRequestStart;
+    this.reporter = reporter;
+    this.errors = errors;
+    this.segmentName = segmentName;
+    this.parsing = parsing;
+    this.output = output;
+    this.storingContent = storingContent;
+    this.pages = pages;
+    this.bytes = bytes;
+    queueMode = conf.get("fetcher.queue.mode",
+        FetchItemQueues.QUEUE_MODE_HOST);
+    // check that the mode is known
+    if (!queueMode.equals(FetchItemQueues.QUEUE_MODE_IP)
+        && !queueMode.equals(FetchItemQueues.QUEUE_MODE_DOMAIN)
+        && !queueMode.equals(FetchItemQueues.QUEUE_MODE_HOST)) {
+      LOG.error("Unknown partition mode : " + queueMode
+          + " - forcing to byHost");
+      queueMode = FetchItemQueues.QUEUE_MODE_HOST;
+    }
+    LOG.info("Using queue mode : " + queueMode);
+    this.maxRedirect = conf.getInt("http.redirect.max", 3);
+    this.ignoreExternalLinks = conf.getBoolean("db.ignore.external.links",
+        false);
+
+    maxOutlinksPerPage = conf.getInt("db.max.outlinks.per.page", 100);
+    maxOutlinks = (maxOutlinksPerPage < 0) ? Integer.MAX_VALUE
+        : maxOutlinksPerPage;
+    interval = conf.getInt("db.fetch.interval.default", 2592000);
+    ignoreExternalLinks = conf.getBoolean("db.ignore.external.links", false);
+    maxOutlinkDepth = conf.getInt("fetcher.follow.outlinks.depth", -1);
+    outlinksIgnoreExternal = conf.getBoolean(
+        "fetcher.follow.outlinks.ignore.external", false);
+    maxOutlinkDepthNumLinks = conf.getInt(
+        "fetcher.follow.outlinks.num.links", 4);
+    outlinksDepthDivisor = conf.getInt(
+        "fetcher.follow.outlinks.depth.divisor", 2);
+  }
+
+  @SuppressWarnings("fallthrough")
+  public void run() {
+    activeThreads.incrementAndGet(); // count threads
+
+    FetchItem fit = null;
+    try {
+
+      while (true) {
+        // check whether must be stopped
+        if (isHalted()) {
+          LOG.debug(getName() + " set to halted");
+          fit = null;
+          return;
+        }
+
+        fit = ((FetchItemQueues) fetchQueues).getFetchItem();
+        if (fit == null) {
+          if (feeder.isAlive() || ((FetchItemQueues) 
fetchQueues).getTotalSize() > 0) {
+            LOG.debug(getName() + " spin-waiting ...");
+            // spin-wait.
+            ((AtomicInteger) spinWaiting).incrementAndGet();
+            try {
+              Thread.sleep(500);
+            } catch (Exception e) {
+            }
+            ((AtomicInteger) spinWaiting).decrementAndGet();
+            continue;
+          } else {
+            // all done, finish this thread
+            LOG.info("Thread " + getName() + " has no more work available");
+            return;
+          }
+        }
+        lastRequestStart.set(System.currentTimeMillis());
+        Text reprUrlWritable = (Text) fit.datum.getMetaData().get(
+            Nutch.WRITABLE_REPR_URL_KEY);
+        if (reprUrlWritable == null) {
+          setReprUrl(fit.url.toString());
+        } else {
+          setReprUrl(reprUrlWritable.toString());
+        }
+        try {
+          // fetch the page
+          redirecting = false;
+          redirectCount = 0;
+          do {
+            if (LOG.isInfoEnabled()) {
+              LOG.info("fetching " + fit.url + " (queue crawl delay="
+                  + ((FetchItemQueues) 
fetchQueues).getFetchItemQueue(fit.queueID).crawlDelay
+                  + "ms)");
+            }
+            if (LOG.isDebugEnabled()) {
+              LOG.debug("redirectCount=" + redirectCount);
+            }
+            redirecting = false;
+            Protocol protocol = this.protocolFactory.getProtocol(fit.url
+                .toString());
+            BaseRobotRules rules = protocol.getRobotRules(fit.url, fit.datum);
+            if (!rules.isAllowed(fit.u.toString())) {
+              // unblock
+              ((FetchItemQueues) fetchQueues).finishFetchItem(fit, true);
+              if (LOG.isDebugEnabled()) {
+                LOG.debug("Denied by robots.txt: " + fit.url);
+              }
+              output(fit.url, fit.datum, null,
+                  ProtocolStatus.STATUS_ROBOTS_DENIED,
+                  CrawlDatum.STATUS_FETCH_GONE);
+              reporter.incrCounter("FetcherStatus", "robots_denied", 1);
+              continue;
+            }
+            if (rules.getCrawlDelay() > 0) {
+              if (rules.getCrawlDelay() > maxCrawlDelay && maxCrawlDelay >= 0) 
{
+                // unblock
+                ((FetchItemQueues) fetchQueues).finishFetchItem(fit, true);
+                LOG.debug("Crawl-Delay for " + fit.url + " too long ("
+                    + rules.getCrawlDelay() + "), skipping");
+                output(fit.url, fit.datum, null,
+                    ProtocolStatus.STATUS_ROBOTS_DENIED,
+                    CrawlDatum.STATUS_FETCH_GONE);
+                reporter.incrCounter("FetcherStatus",
+                    "robots_denied_maxcrawldelay", 1);
+                continue;
+              } else {
+                FetchItemQueue fiq = ((FetchItemQueues) fetchQueues)
+                    .getFetchItemQueue(fit.queueID);
+                fiq.crawlDelay = rules.getCrawlDelay();
+                if (LOG.isDebugEnabled()) {
+                  LOG.info("Crawl delay for queue: " + fit.queueID
+                      + " is set to " + fiq.crawlDelay
+                      + " as per robots.txt. url: " + fit.url);
+                }
+              }
+            }
+            ProtocolOutput output = protocol.getProtocolOutput(fit.url,
+                fit.datum);
+            ProtocolStatus status = output.getStatus();
+            Content content = output.getContent();
+            ParseStatus pstatus = null;
+            // unblock queue
+            ((FetchItemQueues) fetchQueues).finishFetchItem(fit);
+
+            String urlString = fit.url.toString();
+
+            reporter.incrCounter("FetcherStatus", status.getName(), 1);
+
+            switch (status.getCode()) {
+
+            case ProtocolStatus.WOULDBLOCK:
+              // retry ?
+              ((FetchItemQueues) fetchQueues).addFetchItem(fit);
+              break;
+
+            case ProtocolStatus.SUCCESS: // got a page
+              pstatus = output(fit.url, fit.datum, content, status,
+                  CrawlDatum.STATUS_FETCH_SUCCESS, fit.outlinkDepth);
+              updateStatus(content.getContent().length);
+              if (pstatus != null && pstatus.isSuccess()
+                  && pstatus.getMinorCode() == ParseStatus.SUCCESS_REDIRECT) {
+                String newUrl = pstatus.getMessage();
+                int refreshTime = Integer.valueOf(pstatus.getArgs()[1]);
+                Text redirUrl = handleRedirect(fit.url, fit.datum, urlString,
+                    newUrl, refreshTime < Fetcher.PERM_REFRESH_TIME,
+                    Fetcher.CONTENT_REDIR);
+                if (redirUrl != null) {
+                  queueRedirect(redirUrl, fit);
+                }
+              }
+              break;
+
+            case ProtocolStatus.MOVED: // redirect
+            case ProtocolStatus.TEMP_MOVED:
+              int code;
+              boolean temp;
+              if (status.getCode() == ProtocolStatus.MOVED) {
+                code = CrawlDatum.STATUS_FETCH_REDIR_PERM;
+                temp = false;
+              } else {
+                code = CrawlDatum.STATUS_FETCH_REDIR_TEMP;
+                temp = true;
+              }
+              output(fit.url, fit.datum, content, status, code);
+              String newUrl = status.getMessage();
+              Text redirUrl = handleRedirect(fit.url, fit.datum, urlString,
+                  newUrl, temp, Fetcher.PROTOCOL_REDIR);
+              if (redirUrl != null) {
+                queueRedirect(redirUrl, fit);
+              } else {
+                // stop redirecting
+                redirecting = false;
+              }
+              break;
+
+            case ProtocolStatus.EXCEPTION:
+              logError(fit.url, status.getMessage());
+              int killedURLs = ((FetchItemQueues) 
fetchQueues).checkExceptionThreshold(fit
+                  .getQueueID());
+              if (killedURLs != 0)
+                reporter.incrCounter("FetcherStatus",
+                    "AboveExceptionThresholdInQueue", killedURLs);
+              /* FALLTHROUGH */
+            case ProtocolStatus.RETRY: // retry
+            case ProtocolStatus.BLOCKED:
+              output(fit.url, fit.datum, null, status,
+                  CrawlDatum.STATUS_FETCH_RETRY);
+              break;
+
+            case ProtocolStatus.GONE: // gone
+            case ProtocolStatus.NOTFOUND:
+            case ProtocolStatus.ACCESS_DENIED:
+            case ProtocolStatus.ROBOTS_DENIED:
+              output(fit.url, fit.datum, null, status,
+                  CrawlDatum.STATUS_FETCH_GONE);
+              break;
+
+            case ProtocolStatus.NOTMODIFIED:
+              output(fit.url, fit.datum, null, status,
+                  CrawlDatum.STATUS_FETCH_NOTMODIFIED);
+              break;
+
+            default:
+              if (LOG.isWarnEnabled()) {
+                LOG.warn("Unknown ProtocolStatus: " + status.getCode());
+              }
+              output(fit.url, fit.datum, null, status,
+                  CrawlDatum.STATUS_FETCH_RETRY);
+            }
+
+            if (redirecting && redirectCount > maxRedirect) {
+              ((FetchItemQueues) fetchQueues).finishFetchItem(fit);
+              if (LOG.isInfoEnabled()) {
+                LOG.info(" - redirect count exceeded " + fit.url);
+              }
+              output(fit.url, fit.datum, null,
+                  ProtocolStatus.STATUS_REDIR_EXCEEDED,
+                  CrawlDatum.STATUS_FETCH_GONE);
+            }
+
+          } while (redirecting && (redirectCount <= maxRedirect));
+
+        } catch (Throwable t) { // unexpected exception
+          // unblock
+          ((FetchItemQueues) fetchQueues).finishFetchItem(fit);
+          logError(fit.url, StringUtils.stringifyException(t));
+          output(fit.url, fit.datum, null, ProtocolStatus.STATUS_FAILED,
+              CrawlDatum.STATUS_FETCH_RETRY);
+        }
+      }
+
+    } catch (Throwable e) {
+      if (LOG.isErrorEnabled()) {
+        LOG.error("fetcher caught:" + e.toString());
+      }
+    } finally {
+      if (fit != null)
+        ((FetchItemQueues) fetchQueues).finishFetchItem(fit);
+      activeThreads.decrementAndGet(); // count threads
+      LOG.info("-finishing thread " + getName() + ", activeThreads="
+          + activeThreads);
+    }
+  }
+
+  private Text handleRedirect(Text url, CrawlDatum datum, String urlString,
+      String newUrl, boolean temp, String redirType)
+      throws MalformedURLException, URLFilterException {
+    newUrl = normalizers.normalize(newUrl, URLNormalizers.SCOPE_FETCHER);
+    newUrl = urlFilters.filter(newUrl);
+
+    if (ignoreExternalLinks) {
+      try {
+        String origHost = new URL(urlString).getHost().toLowerCase();
+        String newHost = new URL(newUrl).getHost().toLowerCase();
+        if (!origHost.equals(newHost)) {
+          if (LOG.isDebugEnabled()) {
+            LOG.debug(" - ignoring redirect " + redirType + " from "
+                + urlString + " to " + newUrl
+                + " because external links are ignored");
+          }
+          return null;
+        }
+      } catch (MalformedURLException e) {
+      }
+    }
+
+    if (newUrl != null && !newUrl.equals(urlString)) {
+      reprUrl = URLUtil.chooseRepr(reprUrl, newUrl, temp);
+      url = new Text(newUrl);
+      if (maxRedirect > 0) {
+        redirecting = true;
+        redirectCount++;
+        if (LOG.isDebugEnabled()) {
+          LOG.debug(" - " + redirType + " redirect to " + url
+              + " (fetching now)");
+        }
+        return url;
+      } else {
+        CrawlDatum newDatum = new CrawlDatum(CrawlDatum.STATUS_LINKED,
+            datum.getFetchInterval(), datum.getScore());
+        // transfer existing metadata
+        newDatum.getMetaData().putAll(datum.getMetaData());
+        try {
+          scfilters.initialScore(url, newDatum);
+        } catch (ScoringFilterException e) {
+          e.printStackTrace();
+        }
+        if (reprUrl != null) {
+          newDatum.getMetaData().put(Nutch.WRITABLE_REPR_URL_KEY,
+              new Text(reprUrl));
+        }
+        output(url, newDatum, null, null, CrawlDatum.STATUS_LINKED);
+        if (LOG.isDebugEnabled()) {
+          LOG.debug(" - " + redirType + " redirect to " + url
+              + " (fetching later)");
+        }
+        return null;
+      }
+    } else {
+      if (LOG.isDebugEnabled()) {
+        LOG.debug(" - " + redirType + " redirect skipped: "
+            + (newUrl != null ? "to same url" : "filtered"));
+      }
+      return null;
+    }
+  }
+
+  private void queueRedirect(Text redirUrl, FetchItem fit)
+      throws ScoringFilterException {
+    CrawlDatum newDatum = new CrawlDatum(CrawlDatum.STATUS_DB_UNFETCHED,
+        fit.datum.getFetchInterval(), fit.datum.getScore());
+    // transfer all existing metadata to the redirect
+    newDatum.getMetaData().putAll(fit.datum.getMetaData());
+    scfilters.initialScore(redirUrl, newDatum);
+    if (reprUrl != null) {
+      newDatum.getMetaData().put(Nutch.WRITABLE_REPR_URL_KEY,
+          new Text(reprUrl));
+    }
+    fit = FetchItem.create(redirUrl, newDatum, queueMode);
+    if (fit != null) {
+      FetchItemQueue fiq = ((FetchItemQueues) 
fetchQueues).getFetchItemQueue(fit.queueID);
+      fiq.addInProgressFetchItem(fit);
+    } else {
+      // stop redirecting
+      redirecting = false;
+      reporter.incrCounter("FetcherStatus", "FetchItem.notCreated.redirect",
+          1);
+    }
+  }
+
+  private void logError(Text url, String message) {
+    if (LOG.isInfoEnabled()) {
+      LOG.info("fetch of " + url + " failed with: " + message);
+    }
+    errors.incrementAndGet();
+  }
+
+  private ParseStatus output(Text key, CrawlDatum datum, Content content,
+      ProtocolStatus pstatus, int status) {
+
+    return output(key, datum, content, pstatus, status, 0);
+  }
+
+  private ParseStatus output(Text key, CrawlDatum datum, Content content,
+      ProtocolStatus pstatus, int status, int outlinkDepth) {
+
+    datum.setStatus(status);
+    datum.setFetchTime(System.currentTimeMillis());
+    if (pstatus != null)
+      datum.getMetaData().put(Nutch.WRITABLE_PROTO_STATUS_KEY, pstatus);
+
+    ParseResult parseResult = null;
+    if (content != null) {
+      Metadata metadata = content.getMetadata();
+
+      // store the guessed content type in the crawldatum
+      if (content.getContentType() != null)
+        datum.getMetaData().put(new Text(Metadata.CONTENT_TYPE),
+            new Text(content.getContentType()));
+
+      // add segment to metadata
+      metadata.set(Nutch.SEGMENT_NAME_KEY, segmentName);
+      // add score to content metadata so that ParseSegment can pick it up.
+      try {
+        scfilters.passScoreBeforeParsing(key, datum, content);
+      } catch (Exception e) {
+        if (LOG.isWarnEnabled()) {
+          LOG.warn("Couldn't pass score, url " + key + " (" + e + ")");
+        }
+      }
+      /*
+       * Note: Fetcher will only follow meta-redirects coming from the
+       * original URL.
+       */
+      if (parsing && status == CrawlDatum.STATUS_FETCH_SUCCESS) {
+        if (!skipTruncated
+            || (skipTruncated && !ParseSegment.isTruncated(content))) {
+          try {
+            parseResult = this.parseUtil.parse(content);
+          } catch (Exception e) {
+            LOG.warn("Error parsing: " + key + ": "
+                + StringUtils.stringifyException(e));
+          }
+        }
+
+        if (parseResult == null) {
+          byte[] signature = SignatureFactory.getSignature(conf)
+              .calculate(content, new ParseStatus().getEmptyParse(conf));
+          datum.setSignature(signature);
+        }
+      }
+
+      /*
+       * Store status code in content So we can read this value during parsing
+       * (as a separate job) and decide to parse or not.
+       */
+      content.getMetadata().add(Nutch.FETCH_STATUS_KEY,
+          Integer.toString(status));
+    }
+
+    try {
+      output.collect(key, new NutchWritable(datum));
+      if (content != null && storingContent)
+        output.collect(key, new NutchWritable(content));
+      if (parseResult != null) {
+        for (Entry<Text, Parse> entry : parseResult) {
+          Text url = entry.getKey();
+          Parse parse = entry.getValue();
+          ParseStatus parseStatus = parse.getData().getStatus();
+          ParseData parseData = parse.getData();
+
+          if (!parseStatus.isSuccess()) {
+            LOG.warn("Error parsing: " + key + ": " + parseStatus);
+            parse = parseStatus.getEmptyParse(conf);
+          }
+
+          // Calculate page signature. For non-parsing fetchers this will
+          // be done in ParseSegment
+          byte[] signature = SignatureFactory.getSignature(conf)
+              .calculate(content, parse);
+          // Ensure segment name and score are in parseData metadata
+          parseData.getContentMeta().set(Nutch.SEGMENT_NAME_KEY, segmentName);
+          parseData.getContentMeta().set(Nutch.SIGNATURE_KEY,
+              StringUtil.toHexString(signature));
+          // Pass fetch time to content meta
+          parseData.getContentMeta().set(Nutch.FETCH_TIME_KEY,
+              Long.toString(datum.getFetchTime()));
+          if (url.equals(key))
+            datum.setSignature(signature);
+          try {
+            scfilters.passScoreAfterParsing(url, content, parse);
+          } catch (Exception e) {
+            if (LOG.isWarnEnabled()) {
+              LOG.warn("Couldn't pass score, url " + key + " (" + e + ")");
+            }
+          }
+
+          String fromHost;
+
+          // collect outlinks for subsequent db update
+          Outlink[] links = parseData.getOutlinks();
+          int outlinksToStore = Math.min(maxOutlinks, links.length);
+          if (ignoreExternalLinks) {
+            try {
+              fromHost = new URL(url.toString()).getHost().toLowerCase();
+            } catch (MalformedURLException e) {
+              fromHost = null;
+            }
+          } else {
+            fromHost = null;
+          }
+
+          int validCount = 0;
+
+          // Process all outlinks, normalize, filter and deduplicate
+          List<Outlink> outlinkList = new ArrayList<Outlink>(outlinksToStore);
+          HashSet<String> outlinks = new HashSet<String>(outlinksToStore);
+          for (int i = 0; i < links.length && validCount < outlinksToStore; 
i++) {
+            String toUrl = links[i].getToUrl();
+
+            toUrl = ParseOutputFormat.filterNormalize(url.toString(), toUrl,
+                fromHost, ignoreExternalLinks, urlFilters, normalizers);
+            if (toUrl == null) {
+              continue;
+            }
+
+            validCount++;
+            links[i].setUrl(toUrl);
+            outlinkList.add(links[i]);
+            outlinks.add(toUrl);
+          }
+
+          // Only process depth N outlinks
+          if (maxOutlinkDepth > 0 && outlinkDepth < maxOutlinkDepth) {
+            reporter.incrCounter("FetcherOutlinks", "outlinks_detected",
+                outlinks.size());
+
+            // Counter to limit num outlinks to follow per page
+            int outlinkCounter = 0;
+
+            // Calculate variable number of outlinks by depth using the
+            // divisor (outlinks = Math.floor(divisor / depth * num.links))
+            int maxOutlinksByDepth = (int) Math.floor(outlinksDepthDivisor
+                / (outlinkDepth + 1) * maxOutlinkDepthNumLinks);
+
+            String followUrl;
+
+            // Walk over the outlinks and add as new FetchItem to the queues
+            Iterator<String> iter = outlinks.iterator();
+            while (iter.hasNext() && outlinkCounter < maxOutlinkDepthNumLinks) 
{
+              followUrl = iter.next();
+
+              // Check whether we'll follow external outlinks
+              if (outlinksIgnoreExternal) {
+                if (!URLUtil.getHost(url.toString()).equals(
+                    URLUtil.getHost(followUrl))) {
+                  continue;
+                }
+              }
+
+              reporter
+                  .incrCounter("FetcherOutlinks", "outlinks_following", 1);
+
+              // Create new FetchItem with depth incremented
+              FetchItem fit = FetchItem.create(new Text(followUrl),
+                  new CrawlDatum(CrawlDatum.STATUS_LINKED, interval),
+                  queueMode, outlinkDepth + 1);
+              ((FetchItemQueues) fetchQueues).addFetchItem(fit);
+
+              outlinkCounter++;
+            }
+          }
+
+          // Overwrite the outlinks in ParseData with the normalized and
+          // filtered set
+          parseData.setOutlinks(outlinkList.toArray(new Outlink[outlinkList
+              .size()]));
+
+          output.collect(url, new NutchWritable(new ParseImpl(new ParseText(
+              parse.getText()), parseData, parse.isCanonical())));
+        }
+      }
+    } catch (IOException e) {
+      if (LOG.isErrorEnabled()) {
+        LOG.error("fetcher caught:" + e.toString());
+      }
+    }
+
+    // return parse status if it exits
+    if (parseResult != null && !parseResult.isEmpty()) {
+      Parse p = parseResult.get(content.getUrl());
+      if (p != null) {
+        reporter.incrCounter("ParserStatus", ParseStatus.majorCodes[p
+            .getData().getStatus().getMajorCode()], 1);
+        return p.getData().getStatus();
+      }
+    }
+    return null;
+  }
+  
+  private void updateStatus(int bytesInPage) throws IOException {
+    pages.incrementAndGet();
+    bytes.addAndGet(bytesInPage);
+  }
+
+  public synchronized void setHalted(boolean halted) {
+    this.halted = halted;
+  }
+
+  public synchronized boolean isHalted() {
+    return halted;
+  }
+
+  public String getReprUrl() {
+    return reprUrl;
+  }
+  
+  private void setReprUrl(String urlString) {
+    this.reprUrl = urlString;
+    
+  }
+
+}


Reply via email to