Modified: nutch/trunk/src/java/org/apache/nutch/crawl/CrawlDbReader.java
URL: 
http://svn.apache.org/viewvc/nutch/trunk/src/java/org/apache/nutch/crawl/CrawlDbReader.java?rev=1655526&r1=1655525&r2=1655526&view=diff
==============================================================================
--- nutch/trunk/src/java/org/apache/nutch/crawl/CrawlDbReader.java (original)
+++ nutch/trunk/src/java/org/apache/nutch/crawl/CrawlDbReader.java Thu Jan 29 
05:38:59 2015
@@ -30,7 +30,6 @@ import java.util.regex.Matcher;
 import java.util.regex.Pattern;
 import java.util.TreeMap;
 
-
 // Commons Logging imports
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -66,9 +65,9 @@ import org.apache.nutch.util.StringUtil;
 
 /**
  * Read utility for the CrawlDB.
- *
+ * 
  * @author Andrzej Bialecki
- *
+ * 
  */
 public class CrawlDbReader implements Closeable {
 
@@ -76,15 +75,18 @@ public class CrawlDbReader implements Cl
 
   private MapFile.Reader[] readers = null;
 
-  private void openReaders(String crawlDb, Configuration config) throws 
IOException {
-    if (readers != null) return;
+  private void openReaders(String crawlDb, Configuration config)
+      throws IOException {
+    if (readers != null)
+      return;
     FileSystem fs = FileSystem.get(config);
     readers = MapFileOutputFormat.getReaders(fs, new Path(crawlDb,
         CrawlDb.CURRENT_NAME), config);
   }
 
   private void closeReaders() {
-    if (readers == null) return;
+    if (readers == null)
+      return;
     for (int i = 0; i < readers.length; i++) {
       try {
         readers[i].close();
@@ -94,55 +96,61 @@ public class CrawlDbReader implements Cl
     }
   }
 
-  public static class CrawlDatumCsvOutputFormat extends 
FileOutputFormat<Text,CrawlDatum> {
-    protected static class LineRecordWriter implements 
RecordWriter<Text,CrawlDatum> {
+  public static class CrawlDatumCsvOutputFormat extends
+      FileOutputFormat<Text, CrawlDatum> {
+    protected static class LineRecordWriter implements
+        RecordWriter<Text, CrawlDatum> {
       private DataOutputStream out;
+
       public LineRecordWriter(DataOutputStream out) {
         this.out = out;
         try {
           out.writeBytes("Url,Status code,Status name,Fetch Time,Modified 
Time,Retries since fetch,Retry interval seconds,Retry interval 
days,Score,Signature,Metadata\n");
-        } catch (IOException e) {}
+        } catch (IOException e) {
+        }
       }
 
-      public synchronized void write(Text key, CrawlDatum value) throws 
IOException {
-          out.writeByte('"');
-          out.writeBytes(key.toString());
-          out.writeByte('"');
-          out.writeByte(',');
-          out.writeBytes(Integer.toString(value.getStatus()));
-          out.writeByte(',');
-          out.writeByte('"');
-          out.writeBytes(CrawlDatum.getStatusName(value.getStatus()));
-          out.writeByte('"');
-          out.writeByte(',');
-          out.writeBytes(new Date(value.getFetchTime()).toString());
-          out.writeByte(',');
-          out.writeBytes(new Date(value.getModifiedTime()).toString());
-          out.writeByte(',');
-          out.writeBytes(Integer.toString(value.getRetriesSinceFetch()));
-          out.writeByte(',');
-          out.writeBytes(Float.toString(value.getFetchInterval()));
-          out.writeByte(',');
-          out.writeBytes(Float.toString((value.getFetchInterval() / 
FetchSchedule.SECONDS_PER_DAY)));
-          out.writeByte(',');
-          out.writeBytes(Float.toString(value.getScore()));
-          out.writeByte(',');
-          out.writeByte('"');
-          out.writeBytes(value.getSignature() != null ? 
StringUtil.toHexString(value.getSignature()): "null");
-          out.writeByte('"');
-          out.writeByte(',');
-          out.writeByte('"');
-          if (value.getMetaData() != null) {
-            for (Entry<Writable, Writable> e : value.getMetaData().entrySet()) 
{
-              out.writeBytes(e.getKey().toString());
-              out.writeByte(':');
-              out.writeBytes(e.getValue().toString());
-              out.writeBytes("|||");
-            }
+      public synchronized void write(Text key, CrawlDatum value)
+          throws IOException {
+        out.writeByte('"');
+        out.writeBytes(key.toString());
+        out.writeByte('"');
+        out.writeByte(',');
+        out.writeBytes(Integer.toString(value.getStatus()));
+        out.writeByte(',');
+        out.writeByte('"');
+        out.writeBytes(CrawlDatum.getStatusName(value.getStatus()));
+        out.writeByte('"');
+        out.writeByte(',');
+        out.writeBytes(new Date(value.getFetchTime()).toString());
+        out.writeByte(',');
+        out.writeBytes(new Date(value.getModifiedTime()).toString());
+        out.writeByte(',');
+        out.writeBytes(Integer.toString(value.getRetriesSinceFetch()));
+        out.writeByte(',');
+        out.writeBytes(Float.toString(value.getFetchInterval()));
+        out.writeByte(',');
+        out.writeBytes(Float.toString((value.getFetchInterval() / 
FetchSchedule.SECONDS_PER_DAY)));
+        out.writeByte(',');
+        out.writeBytes(Float.toString(value.getScore()));
+        out.writeByte(',');
+        out.writeByte('"');
+        out.writeBytes(value.getSignature() != null ? StringUtil
+            .toHexString(value.getSignature()) : "null");
+        out.writeByte('"');
+        out.writeByte(',');
+        out.writeByte('"');
+        if (value.getMetaData() != null) {
+          for (Entry<Writable, Writable> e : value.getMetaData().entrySet()) {
+            out.writeBytes(e.getKey().toString());
+            out.writeByte(':');
+            out.writeBytes(e.getValue().toString());
+            out.writeBytes("|||");
           }
-          out.writeByte('"');
+        }
+        out.writeByte('"');
 
-          out.writeByte('\n');
+        out.writeByte('\n');
       }
 
       public synchronized void close(Reporter reporter) throws IOException {
@@ -150,42 +158,59 @@ public class CrawlDbReader implements Cl
       }
     }
 
-    public RecordWriter<Text,CrawlDatum> getRecordWriter(FileSystem fs, 
JobConf job, String name,
-        Progressable progress) throws IOException {
+    public RecordWriter<Text, CrawlDatum> getRecordWriter(FileSystem fs,
+        JobConf job, String name, Progressable progress) throws IOException {
       Path dir = FileOutputFormat.getOutputPath(job);
       DataOutputStream fileOut = fs.create(new Path(dir, name), progress);
       return new LineRecordWriter(fileOut);
-   }
+    }
   }
 
-  public static class CrawlDbStatMapper implements Mapper<Text, CrawlDatum, 
Text, LongWritable> {
+  public static class CrawlDbStatMapper implements
+      Mapper<Text, CrawlDatum, Text, LongWritable> {
     LongWritable COUNT_1 = new LongWritable(1);
     private boolean sort = false;
+
     public void configure(JobConf job) {
-      sort = job.getBoolean("db.reader.stats.sort", false );
+      sort = job.getBoolean("db.reader.stats.sort", false);
     }
-    public void close() {}
-    public void map(Text key, CrawlDatum value, OutputCollector<Text, 
LongWritable> output, Reporter reporter)
-            throws IOException {
+
+    public void close() {
+    }
+
+    public void map(Text key, CrawlDatum value,
+        OutputCollector<Text, LongWritable> output, Reporter reporter)
+        throws IOException {
       output.collect(new Text("T"), COUNT_1);
       output.collect(new Text("status " + value.getStatus()), COUNT_1);
-      output.collect(new Text("retry " + value.getRetriesSinceFetch()), 
COUNT_1);
-      output.collect(new Text("s"), new LongWritable((long) (value.getScore() 
* 1000.0)));
-      if(sort){
+      output
+          .collect(new Text("retry " + value.getRetriesSinceFetch()), COUNT_1);
+      output.collect(new Text("s"), new LongWritable(
+          (long) (value.getScore() * 1000.0)));
+      if (sort) {
         URL u = new URL(key.toString());
         String host = u.getHost();
-        output.collect(new Text("status " + value.getStatus() + " " + host), 
COUNT_1);
+        output.collect(new Text("status " + value.getStatus() + " " + host),
+            COUNT_1);
       }
     }
   }
 
-  public static class CrawlDbStatCombiner implements Reducer<Text, 
LongWritable, Text, LongWritable> {
+  public static class CrawlDbStatCombiner implements
+      Reducer<Text, LongWritable, Text, LongWritable> {
     LongWritable val = new LongWritable();
 
-    public CrawlDbStatCombiner() { }
-    public void configure(JobConf job) { }
-    public void close() {}
-    public void reduce(Text key, Iterator<LongWritable> values, 
OutputCollector<Text, LongWritable> output, Reporter reporter)
+    public CrawlDbStatCombiner() {
+    }
+
+    public void configure(JobConf job) {
+    }
+
+    public void close() {
+    }
+
+    public void reduce(Text key, Iterator<LongWritable> values,
+        OutputCollector<Text, LongWritable> output, Reporter reporter)
         throws IOException {
       val.set(0L);
       String k = key.toString();
@@ -201,8 +226,10 @@ public class CrawlDbReader implements Cl
         long max = Long.MIN_VALUE;
         while (values.hasNext()) {
           LongWritable cnt = values.next();
-          if (cnt.get() < min) min = cnt.get();
-          if (cnt.get() > max) max = cnt.get();
+          if (cnt.get() < min)
+            min = cnt.get();
+          if (cnt.get() > max)
+            max = cnt.get();
           total += cnt.get();
         }
         output.collect(new Text("scn"), new LongWritable(min));
@@ -212,11 +239,17 @@ public class CrawlDbReader implements Cl
     }
   }
 
-  public static class CrawlDbStatReducer implements Reducer<Text, 
LongWritable, Text, LongWritable> {
-    public void configure(JobConf job) {}
-    public void close() {}
-    public void reduce(Text key, Iterator<LongWritable> values, 
OutputCollector<Text, LongWritable> output, Reporter reporter)
-            throws IOException {
+  public static class CrawlDbStatReducer implements
+      Reducer<Text, LongWritable, Text, LongWritable> {
+    public void configure(JobConf job) {
+    }
+
+    public void close() {
+    }
+
+    public void reduce(Text key, Iterator<LongWritable> values,
+        OutputCollector<Text, LongWritable> output, Reporter reporter)
+        throws IOException {
 
       String k = key.toString();
       if (k.equals("T")) {
@@ -238,14 +271,16 @@ public class CrawlDbReader implements Cl
         LongWritable cnt = new LongWritable(Long.MIN_VALUE);
         while (values.hasNext()) {
           LongWritable val = values.next();
-          if (cnt.get() < val.get()) cnt.set(val.get());
+          if (cnt.get() < val.get())
+            cnt.set(val.get());
         }
         output.collect(key, cnt);
       } else if (k.equals("scn")) {
         LongWritable cnt = new LongWritable(Long.MAX_VALUE);
         while (values.hasNext()) {
           LongWritable val = values.next();
-          if (cnt.get() > val.get()) cnt.set(val.get());
+          if (cnt.get() > val.get())
+            cnt.set(val.get());
         }
         output.collect(key, cnt);
       } else if (k.equals("sct")) {
@@ -259,27 +294,36 @@ public class CrawlDbReader implements Cl
     }
   }
 
-  public static class CrawlDbTopNMapper implements Mapper<Text, CrawlDatum, 
FloatWritable, Text> {
+  public static class CrawlDbTopNMapper implements
+      Mapper<Text, CrawlDatum, FloatWritable, Text> {
     private static final FloatWritable fw = new FloatWritable();
     private float min = 0.0f;
 
     public void configure(JobConf job) {
       min = job.getFloat("db.reader.topn.min", 0.0f);
     }
-    public void close() {}
-    public void map(Text key, CrawlDatum value, OutputCollector<FloatWritable, 
Text> output, Reporter reporter)
-            throws IOException {
-      if (value.getScore() < min) return; // don't collect low-scoring records
+
+    public void close() {
+    }
+
+    public void map(Text key, CrawlDatum value,
+        OutputCollector<FloatWritable, Text> output, Reporter reporter)
+        throws IOException {
+      if (value.getScore() < min)
+        return; // don't collect low-scoring records
       fw.set(-value.getScore()); // reverse sorting order
       output.collect(fw, key); // invert mapping: score -> url
     }
   }
 
-  public static class CrawlDbTopNReducer implements Reducer<FloatWritable, 
Text, FloatWritable, Text> {
+  public static class CrawlDbTopNReducer implements
+      Reducer<FloatWritable, Text, FloatWritable, Text> {
     private long topN;
     private long count = 0L;
 
-    public void reduce(FloatWritable key, Iterator<Text> values, 
OutputCollector<FloatWritable, Text> output, Reporter reporter) throws 
IOException {
+    public void reduce(FloatWritable key, Iterator<Text> values,
+        OutputCollector<FloatWritable, Text> output, Reporter reporter)
+        throws IOException {
       while (values.hasNext() && count < topN) {
         key.set(-key.get());
         output.collect(key, values.next());
@@ -291,14 +335,16 @@ public class CrawlDbReader implements Cl
       topN = job.getLong("db.reader.topn", 100) / job.getNumReduceTasks();
     }
 
-    public void close() {}
+    public void close() {
+    }
   }
 
   public void close() {
     closeReaders();
   }
 
-  public void processStatJob(String crawlDb, Configuration config, boolean 
sort) throws IOException {
+  public void processStatJob(String crawlDb, Configuration config, boolean 
sort)
+      throws IOException {
 
     if (LOG.isInfoEnabled()) {
       LOG.info("CrawlDb statistics start: " + crawlDb);
@@ -329,7 +375,8 @@ public class CrawlDbReader implements Cl
 
     // reading the result
     FileSystem fileSystem = FileSystem.get(config);
-    SequenceFile.Reader[] readers = 
SequenceFileOutputFormat.getReaders(config, tmpFolder);
+    SequenceFile.Reader[] readers = SequenceFileOutputFormat.getReaders(config,
+        tmpFolder);
 
     Text key = new Text();
     LongWritable value = new LongWritable();
@@ -342,14 +389,18 @@ public class CrawlDbReader implements Cl
         LongWritable val = stats.get(k);
         if (val == null) {
           val = new LongWritable();
-          if (k.equals("scx")) val.set(Long.MIN_VALUE);
-          if (k.equals("scn")) val.set(Long.MAX_VALUE);
+          if (k.equals("scx"))
+            val.set(Long.MIN_VALUE);
+          if (k.equals("scn"))
+            val.set(Long.MAX_VALUE);
           stats.put(k, val);
         }
         if (k.equals("scx")) {
-          if (val.get() < value.get()) val.set(value.get());
+          if (val.get() < value.get())
+            val.set(value.get());
         } else if (k.equals("scn")) {
-          if (val.get() > value.get()) val.set(value.get());
+          if (val.get() > value.get())
+            val.set(value.get());
         } else {
           val.set(val.get() + value.get());
         }
@@ -370,31 +421,40 @@ public class CrawlDbReader implements Cl
         } else if (k.equals("scx")) {
           LOG.info("max score:\t" + (val.get() / 1000.0f));
         } else if (k.equals("sct")) {
-          LOG.info("avg score:\t" + (float) ((((double)val.get()) / 
totalCnt.get()) / 1000.0));
+          LOG.info("avg score:\t"
+              + (float) ((((double) val.get()) / totalCnt.get()) / 1000.0));
         } else if (k.startsWith("status")) {
           String[] st = k.split(" ");
           int code = Integer.parseInt(st[1]);
-          if(st.length >2 ) LOG.info("   " + st[2] +" :\t" + val);
-          else LOG.info(st[0] +" " +code + " (" + 
CrawlDatum.getStatusName((byte) code) + "):\t" + val);
-        } else LOG.info(k + ":\t" + val);
+          if (st.length > 2)
+            LOG.info("   " + st[2] + " :\t" + val);
+          else
+            LOG.info(st[0] + " " + code + " ("
+                + CrawlDatum.getStatusName((byte) code) + "):\t" + val);
+        } else
+          LOG.info(k + ":\t" + val);
       }
     }
     // removing the tmp folder
     fileSystem.delete(tmpFolder, true);
-    if (LOG.isInfoEnabled()) { LOG.info("CrawlDb statistics: done"); }
+    if (LOG.isInfoEnabled()) {
+      LOG.info("CrawlDb statistics: done");
+    }
 
   }
 
-  public CrawlDatum get(String crawlDb, String url, Configuration config) 
throws IOException {
+  public CrawlDatum get(String crawlDb, String url, Configuration config)
+      throws IOException {
     Text key = new Text(url);
     CrawlDatum val = new CrawlDatum();
     openReaders(crawlDb, config);
-    CrawlDatum res = (CrawlDatum)MapFileOutputFormat.getEntry(readers,
+    CrawlDatum res = (CrawlDatum) MapFileOutputFormat.getEntry(readers,
         new HashPartitioner<Text, CrawlDatum>(), key, val);
     return res;
   }
 
-  public void readUrl(String crawlDb, String url, Configuration config) throws 
IOException {
+  public void readUrl(String crawlDb, String url, Configuration config)
+      throws IOException {
     CrawlDatum res = get(crawlDb, url, config);
     System.out.println("URL: " + url);
     if (res != null) {
@@ -404,7 +464,9 @@ public class CrawlDbReader implements Cl
     }
   }
 
-  public void processDumpJob(String crawlDb, String output, Configuration 
config, String format, String regex, String status, Integer retry) throws 
IOException {
+  public void processDumpJob(String crawlDb, String output,
+      Configuration config, String format, String regex, String status,
+      Integer retry) throws IOException {
     if (LOG.isInfoEnabled()) {
       LOG.info("CrawlDb dump: starting");
       LOG.info("CrawlDb db: " + crawlDb);
@@ -421,26 +483,31 @@ public class CrawlDbReader implements Cl
 
     if (format.equals("csv")) {
       job.setOutputFormat(CrawlDatumCsvOutputFormat.class);
-    }
-    else if (format.equals("crawldb")) {
+    } else if (format.equals("crawldb")) {
       job.setOutputFormat(MapFileOutputFormat.class);
     } else {
       job.setOutputFormat(TextOutputFormat.class);
     }
 
-    if (status != null) job.set("status", status);
-    if (regex != null) job.set("regex", regex);
-    if (retry != null) job.setInt("retry", retry);
-    
+    if (status != null)
+      job.set("status", status);
+    if (regex != null)
+      job.set("regex", regex);
+    if (retry != null)
+      job.setInt("retry", retry);
+
     job.setMapperClass(CrawlDbDumpMapper.class);
     job.setOutputKeyClass(Text.class);
     job.setOutputValueClass(CrawlDatum.class);
 
     JobClient.runJob(job);
-    if (LOG.isInfoEnabled()) { LOG.info("CrawlDb dump: done"); }
+    if (LOG.isInfoEnabled()) {
+      LOG.info("CrawlDb dump: done");
+    }
   }
 
-  public static class CrawlDbDumpMapper implements Mapper<Text, CrawlDatum, 
Text, CrawlDatum> {
+  public static class CrawlDbDumpMapper implements
+      Mapper<Text, CrawlDatum, Text, CrawlDatum> {
     Pattern pattern = null;
     Matcher matcher = null;
     String status = null;
@@ -454,10 +521,13 @@ public class CrawlDbReader implements Cl
       retry = job.getInt("retry", -1);
     }
 
-    public void close() {}
-    public void map(Text key, CrawlDatum value, OutputCollector<Text, 
CrawlDatum> output, Reporter reporter)
-            throws IOException {
-            
+    public void close() {
+    }
+
+    public void map(Text key, CrawlDatum value,
+        OutputCollector<Text, CrawlDatum> output, Reporter reporter)
+        throws IOException {
+
       // check retry
       if (retry != -1) {
         if (value.getRetriesSinceFetch() < retry) {
@@ -467,7 +537,9 @@ public class CrawlDbReader implements Cl
 
       // check status
       if (status != null
-        && 
!status.equalsIgnoreCase(CrawlDatum.getStatusName(value.getStatus()))) return;
+          && !status.equalsIgnoreCase(CrawlDatum.getStatusName(value
+              .getStatus())))
+        return;
 
       // check regex
       if (pattern != null) {
@@ -481,7 +553,8 @@ public class CrawlDbReader implements Cl
     }
   }
 
-  public void processTopNJob(String crawlDb, long topN, float min, String 
output, Configuration config) throws IOException {
+  public void processTopNJob(String crawlDb, long topN, float min,
+      String output, Configuration config) throws IOException {
 
     if (LOG.isInfoEnabled()) {
       LOG.info("CrawlDb topN: starting (topN=" + topN + ", min=" + min + ")");
@@ -489,10 +562,9 @@ public class CrawlDbReader implements Cl
     }
 
     Path outFolder = new Path(output);
-    Path tempDir =
-      new Path(config.get("mapred.temp.dir", ".") +
-               "/readdb-topN-temp-"+
-               Integer.toString(new Random().nextInt(Integer.MAX_VALUE)));
+    Path tempDir = new Path(config.get("mapred.temp.dir", ".")
+        + "/readdb-topN-temp-"
+        + Integer.toString(new Random().nextInt(Integer.MAX_VALUE)));
 
     JobConf job = new NutchJob(config);
     job.setJobName("topN prepare " + crawlDb);
@@ -531,7 +603,9 @@ public class CrawlDbReader implements Cl
     JobClient.runJob(job);
     FileSystem fs = FileSystem.get(config);
     fs.delete(tempDir, true);
-    if (LOG.isInfoEnabled()) { LOG.info("CrawlDb topN: done"); }
+    if (LOG.isInfoEnabled()) {
+      LOG.info("CrawlDb topN: done");
+    }
 
   }
 
@@ -540,20 +614,29 @@ public class CrawlDbReader implements Cl
     CrawlDbReader dbr = new CrawlDbReader();
 
     if (args.length < 2) {
-      System.err.println("Usage: CrawlDbReader <crawldb> (-stats | -dump 
<out_dir> | -topN <nnnn> <out_dir> [<min>] | -url <url>)");
-      System.err.println("\t<crawldb>\tdirectory name where crawldb is 
located");
-      System.err.println("\t-stats [-sort] \tprint overall statistics to 
System.out");
+      System.err
+          .println("Usage: CrawlDbReader <crawldb> (-stats | -dump <out_dir> | 
-topN <nnnn> <out_dir> [<min>] | -url <url>)");
+      System.err
+          .println("\t<crawldb>\tdirectory name where crawldb is located");
+      System.err
+          .println("\t-stats [-sort] \tprint overall statistics to 
System.out");
       System.err.println("\t\t[-sort]\tlist status sorted by host");
-      System.err.println("\t-dump <out_dir> [-format normal|csv|crawldb]\tdump 
the whole db to a text file in <out_dir>");
+      System.err
+          .println("\t-dump <out_dir> [-format normal|csv|crawldb]\tdump the 
whole db to a text file in <out_dir>");
       System.err.println("\t\t[-format csv]\tdump in Csv format");
-      System.err.println("\t\t[-format normal]\tdump in standard format 
(default option)");
+      System.err
+          .println("\t\t[-format normal]\tdump in standard format (default 
option)");
       System.err.println("\t\t[-format crawldb]\tdump as CrawlDB");
       System.err.println("\t\t[-regex <expr>]\tfilter records with 
expression");
       System.err.println("\t\t[-retry <num>]\tminimum retry count");
-      System.err.println("\t\t[-status <status>]\tfilter records by CrawlDatum 
status");
-      System.err.println("\t-url <url>\tprint information on <url> to 
System.out");
-      System.err.println("\t-topN <nnnn> <out_dir> [<min>]\tdump top <nnnn> 
urls sorted by score to <out_dir>");
-      System.err.println("\t\t[<min>]\tskip records with scores below this 
value.");
+      System.err
+          .println("\t\t[-status <status>]\tfilter records by CrawlDatum 
status");
+      System.err
+          .println("\t-url <url>\tprint information on <url> to System.out");
+      System.err
+          .println("\t-topN <nnnn> <out_dir> [<min>]\tdump top <nnnn> urls 
sorted by score to <out_dir>");
+      System.err
+          .println("\t\t[<min>]\tskip records with scores below this value.");
       System.err.println("\t\t\tThis can significantly improve performance.");
       return;
     }
@@ -563,7 +646,7 @@ public class CrawlDbReader implements Cl
     for (int i = 1; i < args.length; i++) {
       if (args[i].equals("-stats")) {
         boolean toSort = false;
-        if(i < args.length - 1 && "-sort".equals(args[i+1])){
+        if (i < args.length - 1 && "-sort".equals(args[i + 1])) {
           toSort = true;
           i++;
         }
@@ -577,19 +660,19 @@ public class CrawlDbReader implements Cl
         for (int j = i + 1; j < args.length; j++) {
           if (args[j].equals("-format")) {
             format = args[++j];
-            i=i+2;
+            i = i + 2;
           }
           if (args[j].equals("-regex")) {
             regex = args[++j];
-            i=i+2;
+            i = i + 2;
           }
           if (args[j].equals("-retry")) {
             retry = Integer.parseInt(args[++j]);
-            i=i+2;
+            i = i + 2;
           }
           if (args[j].equals("-status")) {
             status = args[++j];
-            i=i+2;
+            i = i + 2;
           }
         }
         dbr.processDumpJob(crawlDb, param, conf, format, regex, status, retry);

Modified: nutch/trunk/src/java/org/apache/nutch/crawl/CrawlDbReducer.java
URL: 
http://svn.apache.org/viewvc/nutch/trunk/src/java/org/apache/nutch/crawl/CrawlDbReducer.java?rev=1655526&r1=1655525&r2=1655526&view=diff
==============================================================================
--- nutch/trunk/src/java/org/apache/nutch/crawl/CrawlDbReducer.java (original)
+++ nutch/trunk/src/java/org/apache/nutch/crawl/CrawlDbReducer.java Thu Jan 29 
05:38:59 2015
@@ -35,9 +35,11 @@ import org.apache.nutch.scoring.ScoringF
 import org.apache.nutch.scoring.ScoringFilters;
 
 /** Merge new page entries with existing entries. */
-public class CrawlDbReducer implements Reducer<Text, CrawlDatum, Text, 
CrawlDatum> {
-  public static final Logger LOG = 
LoggerFactory.getLogger(CrawlDbReducer.class);
-  
+public class CrawlDbReducer implements
+    Reducer<Text, CrawlDatum, Text, CrawlDatum> {
+  public static final Logger LOG = LoggerFactory
+      .getLogger(CrawlDbReducer.class);
+
   private int retryMax;
   private CrawlDatum result = new CrawlDatum();
   private InlinkPriorityQueue linked = null;
@@ -50,17 +52,18 @@ public class CrawlDbReducer implements R
     retryMax = job.getInt("db.fetch.retry.max", 3);
     scfilters = new ScoringFilters(job);
     additionsAllowed = job.getBoolean(CrawlDb.CRAWLDB_ADDITIONS_ALLOWED, true);
-    maxInterval = job.getInt("db.fetch.interval.max", 0 );
+    maxInterval = job.getInt("db.fetch.interval.max", 0);
     schedule = FetchScheduleFactory.getFetchSchedule(job);
     int maxLinks = job.getInt("db.update.max.inlinks", 10000);
     linked = new InlinkPriorityQueue(maxLinks);
   }
 
-  public void close() {}
+  public void close() {
+  }
 
   public void reduce(Text key, Iterator<CrawlDatum> values,
-                     OutputCollector<Text, CrawlDatum> output, Reporter 
reporter)
-    throws IOException {
+      OutputCollector<Text, CrawlDatum> output, Reporter reporter)
+      throws IOException {
 
     CrawlDatum fetch = new CrawlDatum();
     CrawlDatum old = new CrawlDatum();
@@ -71,10 +74,11 @@ public class CrawlDbReducer implements R
     boolean multiple = false; // avoid deep copy when only single value exists
     linked.clear();
     org.apache.hadoop.io.MapWritable metaFromParse = null;
-    
+
     while (values.hasNext()) {
       CrawlDatum datum = values.next();
-      if (!multiple && values.hasNext()) multiple = true;
+      if (!multiple && values.hasNext())
+        multiple = true;
       if (CrawlDatum.hasDbStatus(datum)) {
         if (!oldSet) {
           if (multiple) {
@@ -86,7 +90,8 @@ public class CrawlDbReducer implements R
           oldSet = true;
         } else {
           // always take the latest version
-          if (old.getFetchTime() < datum.getFetchTime()) old.set(datum);
+          if (old.getFetchTime() < datum.getFetchTime())
+            old.set(datum);
         }
         continue;
       }
@@ -101,12 +106,13 @@ public class CrawlDbReducer implements R
           fetchSet = true;
         } else {
           // always take the latest version
-          if (fetch.getFetchTime() < datum.getFetchTime()) fetch.set(datum);
+          if (fetch.getFetchTime() < datum.getFetchTime())
+            fetch.set(datum);
         }
         continue;
       }
 
-      switch (datum.getStatus()) {                // collect other info
+      switch (datum.getStatus()) { // collect other info
       case CrawlDatum.STATUS_LINKED:
         CrawlDatum link;
         if (multiple) {
@@ -127,7 +133,7 @@ public class CrawlDbReducer implements R
         LOG.warn("Unknown status, key: " + key + ", datum: " + datum);
       }
     }
-    
+
     // copy the content of the queue into a List
     // in reversed order
     int numLinks = linked.size();
@@ -135,28 +141,31 @@ public class CrawlDbReducer implements R
     for (int i = numLinks - 1; i >= 0; i--) {
       linkList.add(linked.pop());
     }
-    
+
     // if it doesn't already exist, skip it
-    if (!oldSet && !additionsAllowed) return;
-    
+    if (!oldSet && !additionsAllowed)
+      return;
+
     // if there is no fetched datum, perhaps there is a link
     if (!fetchSet && linkList.size() > 0) {
       fetch = linkList.get(0);
       fetchSet = true;
     }
-    
+
     // still no new data - record only unchanged old data, if exists, and 
return
     if (!fetchSet) {
       if (oldSet) {// at this point at least "old" should be present
         output.collect(key, old);
-        reporter.getCounter("CrawlDB status", 
CrawlDatum.getStatusName(old.getStatus())).increment(1);
+        reporter.getCounter("CrawlDB status",
+            CrawlDatum.getStatusName(old.getStatus())).increment(1);
       } else {
         LOG.warn("Missing fetch and old value, signature=" + signature);
       }
       return;
     }
-    
-    if (signature == null) signature = fetch.getSignature();
+
+    if (signature == null)
+      signature = fetch.getSignature();
     long prevModifiedTime = oldSet ? old.getModifiedTime() : 0L;
     long prevFetchTime = oldSet ? old.getFetchTime() : 0L;
 
@@ -175,12 +184,12 @@ public class CrawlDbReducer implements R
         result.setModifiedTime(old.getModifiedTime());
       }
     }
-    
-    switch (fetch.getStatus()) {                // determine new status
 
-    case CrawlDatum.STATUS_LINKED:                // it was link
-      if (oldSet) {                          // if old exists
-        result.set(old);                          // use it
+    switch (fetch.getStatus()) { // determine new status
+
+    case CrawlDatum.STATUS_LINKED: // it was link
+      if (oldSet) { // if old exists
+        result.set(old); // use it
       } else {
         result = schedule.initializeSchedule(key, result);
         result.setStatus(CrawlDatum.STATUS_DB_UNFETCHED);
@@ -188,18 +197,18 @@ public class CrawlDbReducer implements R
           scfilters.initialScore(key, result);
         } catch (ScoringFilterException e) {
           if (LOG.isWarnEnabled()) {
-            LOG.warn("Cannot filter init score for url " + key +
-                     ", using default: " + e.getMessage());
+            LOG.warn("Cannot filter init score for url " + key
+                + ", using default: " + e.getMessage());
           }
           result.setScore(0.0f);
         }
       }
       break;
-      
-    case CrawlDatum.STATUS_FETCH_SUCCESS:         // succesful fetch
-    case CrawlDatum.STATUS_FETCH_REDIR_TEMP:      // successful fetch, 
redirected
+
+    case CrawlDatum.STATUS_FETCH_SUCCESS: // succesful fetch
+    case CrawlDatum.STATUS_FETCH_REDIR_TEMP: // successful fetch, redirected
     case CrawlDatum.STATUS_FETCH_REDIR_PERM:
-    case CrawlDatum.STATUS_FETCH_NOTMODIFIED:     // successful fetch, 
notmodified
+    case CrawlDatum.STATUS_FETCH_NOTMODIFIED: // successful fetch, notmodified
       // determine the modification status
       int modified = FetchSchedule.STATUS_UNKNOWN;
       if (fetch.getStatus() == CrawlDatum.STATUS_FETCH_NOTMODIFIED) {
@@ -217,15 +226,18 @@ public class CrawlDbReducer implements R
       }
       // set the schedule
       result = schedule.setFetchSchedule(key, result, prevFetchTime,
-          prevModifiedTime, fetch.getFetchTime(), fetch.getModifiedTime(), 
modified);
+          prevModifiedTime, fetch.getFetchTime(), fetch.getModifiedTime(),
+          modified);
       // set the result status and signature
       if (modified == FetchSchedule.STATUS_NOTMODIFIED) {
         result.setStatus(CrawlDatum.STATUS_DB_NOTMODIFIED);
 
-        // NUTCH-1341 The page is not modified according to its signature, 
let's reset lastModified as well
+        // NUTCH-1341 The page is not modified according to its signature, 
let's
+        // reset lastModified as well
         result.setModifiedTime(prevModifiedTime);
 
-        if (oldSet) result.setSignature(old.getSignature());
+        if (oldSet)
+          result.setSignature(old.getSignature());
       } else {
         switch (fetch.getStatus()) {
         case CrawlDatum.STATUS_FETCH_SUCCESS:
@@ -238,9 +250,12 @@ public class CrawlDbReducer implements R
           result.setStatus(CrawlDatum.STATUS_DB_REDIR_TEMP);
           break;
         default:
-          LOG.warn("Unexpected status: " + fetch.getStatus() + " resetting to 
old status.");
-          if (oldSet) result.setStatus(old.getStatus());
-          else result.setStatus(CrawlDatum.STATUS_DB_UNFETCHED);
+          LOG.warn("Unexpected status: " + fetch.getStatus()
+              + " resetting to old status.");
+          if (oldSet)
+            result.setStatus(old.getStatus());
+          else
+            result.setStatus(CrawlDatum.STATUS_DB_UNFETCHED);
         }
         result.setSignature(signature);
       }
@@ -262,11 +277,11 @@ public class CrawlDbReducer implements R
     case CrawlDatum.STATUS_SIGNATURE:
       if (LOG.isWarnEnabled()) {
         LOG.warn("Lone CrawlDatum.STATUS_SIGNATURE: " + key);
-      }   
+      }
       return;
-    case CrawlDatum.STATUS_FETCH_RETRY:           // temporary failure
+    case CrawlDatum.STATUS_FETCH_RETRY: // temporary failure
       if (oldSet) {
-        result.setSignature(old.getSignature());  // use old signature
+        result.setSignature(old.getSignature()); // use old signature
       }
       result = schedule.setPageRetrySchedule(key, result, prevFetchTime,
           prevModifiedTime, fetch.getFetchTime());
@@ -275,20 +290,21 @@ public class CrawlDbReducer implements R
       } else {
         result.setStatus(CrawlDatum.STATUS_DB_GONE);
         result = schedule.setPageGoneSchedule(key, result, prevFetchTime,
-          prevModifiedTime, fetch.getFetchTime());
+            prevModifiedTime, fetch.getFetchTime());
       }
       break;
 
-    case CrawlDatum.STATUS_FETCH_GONE:            // permanent failure
+    case CrawlDatum.STATUS_FETCH_GONE: // permanent failure
       if (oldSet)
-        result.setSignature(old.getSignature());  // use old signature
+        result.setSignature(old.getSignature()); // use old signature
       result.setStatus(CrawlDatum.STATUS_DB_GONE);
       result = schedule.setPageGoneSchedule(key, result, prevFetchTime,
           prevModifiedTime, fetch.getFetchTime());
       break;
 
     default:
-      throw new RuntimeException("Unknown status: " + fetch.getStatus() + " " 
+ key);
+      throw new RuntimeException("Unknown status: " + fetch.getStatus() + " "
+          + key);
     }
 
     try {
@@ -301,22 +317,23 @@ public class CrawlDbReducer implements R
     // remove generation time, if any
     result.getMetaData().remove(Nutch.WRITABLE_GENERATE_TIME_KEY);
     output.collect(key, result);
-    reporter.getCounter("CrawlDB status", 
CrawlDatum.getStatusName(result.getStatus())).increment(1);
+    reporter.getCounter("CrawlDB status",
+        CrawlDatum.getStatusName(result.getStatus())).increment(1);
   }
-  
+
 }
 
 class InlinkPriorityQueue extends PriorityQueue<CrawlDatum> {
-  
+
   public InlinkPriorityQueue(int maxSize) {
     initialize(maxSize);
   }
-  
+
   /** Determines the ordering of objects in this priority queue. **/
   protected boolean lessThan(Object arg0, Object arg1) {
     CrawlDatum candidate = (CrawlDatum) arg0;
     CrawlDatum least = (CrawlDatum) arg1;
     return candidate.getScore() > least.getScore();
   }
-  
+
 }

Modified: nutch/trunk/src/java/org/apache/nutch/crawl/DeduplicationJob.java
URL: 
http://svn.apache.org/viewvc/nutch/trunk/src/java/org/apache/nutch/crawl/DeduplicationJob.java?rev=1655526&r1=1655525&r2=1655526&view=diff
==============================================================================
--- nutch/trunk/src/java/org/apache/nutch/crawl/DeduplicationJob.java (original)
+++ nutch/trunk/src/java/org/apache/nutch/crawl/DeduplicationJob.java Thu Jan 
29 05:38:59 2015
@@ -54,245 +54,244 @@ import org.slf4j.LoggerFactory;
  * all of them as duplicate except the one with the highest score (based on the
  * score in the crawldb, which is not necessarily the same as the score
  * indexed). If two (or more) documents have the same score, then the document
- * with the latest timestamp is kept. If the documents have the same timestamp 
- * then the one with the shortest URL is kept. The documents marked as 
duplicate can then
- * be deleted with the command CleaningJob.
+ * with the latest timestamp is kept. If the documents have the same timestamp
+ * then the one with the shortest URL is kept. The documents marked as 
duplicate
+ * can then be deleted with the command CleaningJob.
  ***/
 public class DeduplicationJob extends Configured implements Tool {
 
-    public static final Logger LOG = LoggerFactory
-            .getLogger(DeduplicationJob.class);
+  public static final Logger LOG = LoggerFactory
+      .getLogger(DeduplicationJob.class);
 
-    private final static Text urlKey = new Text("_URLTEMPKEY_");
+  private final static Text urlKey = new Text("_URLTEMPKEY_");
 
-    public static class DBFilter implements
-            Mapper<Text, CrawlDatum, BytesWritable, CrawlDatum> {
+  public static class DBFilter implements
+      Mapper<Text, CrawlDatum, BytesWritable, CrawlDatum> {
 
-        @Override
-        public void configure(JobConf arg0) {
-        }
+    @Override
+    public void configure(JobConf arg0) {
+    }
 
-        @Override
-        public void close() throws IOException {
-        }
+    @Override
+    public void close() throws IOException {
+    }
 
-        @Override
-        public void map(Text key, CrawlDatum value,
-                OutputCollector<BytesWritable, CrawlDatum> output,
-                Reporter reporter) throws IOException {
-
-            if (value.getStatus() == CrawlDatum.STATUS_DB_FETCHED
-                    || value.getStatus() == CrawlDatum.STATUS_DB_NOTMODIFIED) {
-                // || value.getStatus() ==CrawlDatum.STATUS_DB_GONE){
-                byte[] signature = value.getSignature();
-                if (signature == null) return;
-                BytesWritable sig = new BytesWritable(signature);
-                // add the URL as a temporary MD
-                value.getMetaData().put(urlKey, key);
-                // reduce on the signature
-                output.collect(sig, value);
-            }
-        }
+    @Override
+    public void map(Text key, CrawlDatum value,
+        OutputCollector<BytesWritable, CrawlDatum> output, Reporter reporter)
+        throws IOException {
+
+      if (value.getStatus() == CrawlDatum.STATUS_DB_FETCHED
+          || value.getStatus() == CrawlDatum.STATUS_DB_NOTMODIFIED) {
+        // || value.getStatus() ==CrawlDatum.STATUS_DB_GONE){
+        byte[] signature = value.getSignature();
+        if (signature == null)
+          return;
+        BytesWritable sig = new BytesWritable(signature);
+        // add the URL as a temporary MD
+        value.getMetaData().put(urlKey, key);
+        // reduce on the signature
+        output.collect(sig, value);
+      }
     }
+  }
 
-    public static class DedupReducer implements
-            Reducer<BytesWritable, CrawlDatum, Text, CrawlDatum> {
+  public static class DedupReducer implements
+      Reducer<BytesWritable, CrawlDatum, Text, CrawlDatum> {
 
-        private void writeOutAsDuplicate(CrawlDatum datum,
-                OutputCollector<Text, CrawlDatum> output, Reporter reporter)
-                throws IOException {
-            datum.setStatus(CrawlDatum.STATUS_DB_DUPLICATE);
-            Text key = (Text) datum.getMetaData().remove(urlKey);
-            reporter.incrCounter("DeduplicationJobStatus",
-                    "Documents marked as duplicate", 1);
-            output.collect(key, datum);
-        }
+    private void writeOutAsDuplicate(CrawlDatum datum,
+        OutputCollector<Text, CrawlDatum> output, Reporter reporter)
+        throws IOException {
+      datum.setStatus(CrawlDatum.STATUS_DB_DUPLICATE);
+      Text key = (Text) datum.getMetaData().remove(urlKey);
+      reporter.incrCounter("DeduplicationJobStatus",
+          "Documents marked as duplicate", 1);
+      output.collect(key, datum);
+    }
 
-        @Override
-        public void reduce(BytesWritable key, Iterator<CrawlDatum> values,
-                OutputCollector<Text, CrawlDatum> output, Reporter reporter)
-                throws IOException {
-            CrawlDatum existingDoc = null;
-
-            while (values.hasNext()) {
-                if (existingDoc == null) {
-                    existingDoc = new CrawlDatum();
-                    existingDoc.set(values.next());
-                    continue;
-                }
-                CrawlDatum newDoc = values.next();
-                // compare based on score
-                if (existingDoc.getScore() < newDoc.getScore()) {
-                    writeOutAsDuplicate(existingDoc, output, reporter);
-                    existingDoc = new CrawlDatum();
-                    existingDoc.set(newDoc);
-                    continue;
-                } else if (existingDoc.getScore() > newDoc.getScore()) {
-                    // mark new one as duplicate
-                    writeOutAsDuplicate(newDoc, output, reporter);
-                    continue;
-                }
-                // same score? delete the one which is oldest
-                if (existingDoc.getFetchTime() > newDoc.getFetchTime()) {
-                    // mark new one as duplicate
-                    writeOutAsDuplicate(newDoc, output, reporter);
-                    continue;
-                } else if (existingDoc.getFetchTime() < newDoc.getFetchTime()) 
{
-                    // mark existing one as duplicate
-                    writeOutAsDuplicate(existingDoc, output, reporter);
-                    existingDoc = new CrawlDatum();
-                    existingDoc.set(newDoc);
-                    continue;
-                }
-                // same time? keep the one which has the shortest URL
-                String urlExisting = 
existingDoc.getMetaData().get(urlKey).toString();
-                String urlnewDoc = newDoc.getMetaData().get(urlKey).toString();
-                if (urlExisting.length()<urlnewDoc.length()){
-                  // mark new one as duplicate
-                  writeOutAsDuplicate(newDoc, output, reporter);
-                  continue;
-                }
-                else if (urlExisting.length()>urlnewDoc.length()){
-                  // mark existing one as duplicate
-                  writeOutAsDuplicate(existingDoc, output, reporter);
-                  existingDoc = new CrawlDatum();
-                  existingDoc.set(newDoc);
-                  continue;
-                }
-            }
+    @Override
+    public void reduce(BytesWritable key, Iterator<CrawlDatum> values,
+        OutputCollector<Text, CrawlDatum> output, Reporter reporter)
+        throws IOException {
+      CrawlDatum existingDoc = null;
+
+      while (values.hasNext()) {
+        if (existingDoc == null) {
+          existingDoc = new CrawlDatum();
+          existingDoc.set(values.next());
+          continue;
+        }
+        CrawlDatum newDoc = values.next();
+        // compare based on score
+        if (existingDoc.getScore() < newDoc.getScore()) {
+          writeOutAsDuplicate(existingDoc, output, reporter);
+          existingDoc = new CrawlDatum();
+          existingDoc.set(newDoc);
+          continue;
+        } else if (existingDoc.getScore() > newDoc.getScore()) {
+          // mark new one as duplicate
+          writeOutAsDuplicate(newDoc, output, reporter);
+          continue;
+        }
+        // same score? delete the one which is oldest
+        if (existingDoc.getFetchTime() > newDoc.getFetchTime()) {
+          // mark new one as duplicate
+          writeOutAsDuplicate(newDoc, output, reporter);
+          continue;
+        } else if (existingDoc.getFetchTime() < newDoc.getFetchTime()) {
+          // mark existing one as duplicate
+          writeOutAsDuplicate(existingDoc, output, reporter);
+          existingDoc = new CrawlDatum();
+          existingDoc.set(newDoc);
+          continue;
+        }
+        // same time? keep the one which has the shortest URL
+        String urlExisting = existingDoc.getMetaData().get(urlKey).toString();
+        String urlnewDoc = newDoc.getMetaData().get(urlKey).toString();
+        if (urlExisting.length() < urlnewDoc.length()) {
+          // mark new one as duplicate
+          writeOutAsDuplicate(newDoc, output, reporter);
+          continue;
+        } else if (urlExisting.length() > urlnewDoc.length()) {
+          // mark existing one as duplicate
+          writeOutAsDuplicate(existingDoc, output, reporter);
+          existingDoc = new CrawlDatum();
+          existingDoc.set(newDoc);
+          continue;
         }
+      }
+    }
 
-        @Override
-        public void configure(JobConf arg0) {
-        }
+    @Override
+    public void configure(JobConf arg0) {
+    }
 
-        @Override
-        public void close() throws IOException {
+    @Override
+    public void close() throws IOException {
 
-        }
     }
+  }
 
-    /** Combine multiple new entries for a url. */
-    public static class StatusUpdateReducer implements
-            Reducer<Text, CrawlDatum, Text, CrawlDatum> {
+  /** Combine multiple new entries for a url. */
+  public static class StatusUpdateReducer implements
+      Reducer<Text, CrawlDatum, Text, CrawlDatum> {
 
-        public void configure(JobConf job) {
-        }
+    public void configure(JobConf job) {
+    }
 
-        public void close() {
-        }
+    public void close() {
+    }
 
-        private CrawlDatum old = new CrawlDatum();
-        private CrawlDatum duplicate = new CrawlDatum();
+    private CrawlDatum old = new CrawlDatum();
+    private CrawlDatum duplicate = new CrawlDatum();
 
-        public void reduce(Text key, Iterator<CrawlDatum> values,
-                OutputCollector<Text, CrawlDatum> output, Reporter reporter)
-                throws IOException {
-            boolean duplicateSet = false;
-            
-            while (values.hasNext()) {
-                CrawlDatum val = values.next();
-                if (val.getStatus() == CrawlDatum.STATUS_DB_DUPLICATE) {
-                    duplicate.set(val);
-                    duplicateSet = true;
-                } else {
-                    old.set(val);
-                }
-            }
-
-            // keep the duplicate if there is one
-            if (duplicateSet) {
-                output.collect(key, duplicate);
-                return;
-            }
+    public void reduce(Text key, Iterator<CrawlDatum> values,
+        OutputCollector<Text, CrawlDatum> output, Reporter reporter)
+        throws IOException {
+      boolean duplicateSet = false;
+
+      while (values.hasNext()) {
+        CrawlDatum val = values.next();
+        if (val.getStatus() == CrawlDatum.STATUS_DB_DUPLICATE) {
+          duplicate.set(val);
+          duplicateSet = true;
+        } else {
+          old.set(val);
+        }
+      }
+
+      // keep the duplicate if there is one
+      if (duplicateSet) {
+        output.collect(key, duplicate);
+        return;
+      }
 
-            // no duplicate? keep old one then
-            output.collect(key, old);
-        }
+      // no duplicate? keep old one then
+      output.collect(key, old);
     }
+  }
 
-    public int run(String[] args) throws IOException {
-        if (args.length < 1) {
-            System.err.println("Usage: DeduplicationJob <crawldb>");
-            return 1;
-        }
+  public int run(String[] args) throws IOException {
+    if (args.length < 1) {
+      System.err.println("Usage: DeduplicationJob <crawldb>");
+      return 1;
+    }
 
-        String crawldb = args[0];
+    String crawldb = args[0];
 
-        SimpleDateFormat sdf = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
-        long start = System.currentTimeMillis();
-        LOG.info("DeduplicationJob: starting at " + sdf.format(start));
-
-        Path tempDir = new Path(getConf().get("mapred.temp.dir", ".")
-                + "/dedup-temp-"
-                + Integer.toString(new Random().nextInt(Integer.MAX_VALUE)));
-
-        JobConf job = new NutchJob(getConf());
-        
-        job.setJobName("Deduplication on "+crawldb);
-
-        FileInputFormat.addInputPath(job, new Path(crawldb,
-                CrawlDb.CURRENT_NAME));
-        job.setInputFormat(SequenceFileInputFormat.class);
-
-        FileOutputFormat.setOutputPath(job, tempDir);
-        job.setOutputFormat(SequenceFileOutputFormat.class);
-
-        job.setMapOutputKeyClass(BytesWritable.class);
-        job.setMapOutputValueClass(CrawlDatum.class);
-
-        job.setOutputKeyClass(Text.class);
-        job.setOutputValueClass(CrawlDatum.class);
-
-        job.setMapperClass(DBFilter.class);
-        job.setReducerClass(DedupReducer.class);
-
-        try {
-            RunningJob rj = JobClient.runJob(job);
-            Group g = rj.getCounters().getGroup("DeduplicationJobStatus");
-            if (g != null){
-              long dups = g.getCounter("Documents marked as duplicate");
-              LOG.info("Deduplication: "+(int)dups+" documents marked as 
duplicates");
-            }
-        } catch (final Exception e) {
-            LOG.error("DeduplicationJob: " + 
StringUtils.stringifyException(e));
-            return -1;
-        }
-        
-        // merge with existing crawl db
-        if (LOG.isInfoEnabled()) {
-            LOG.info("Deduplication: Updating status of duplicate urls into 
crawl db.");
-        }
-
-        Path dbPath = new Path(crawldb);
-        JobConf mergeJob = CrawlDb.createJob(getConf(), dbPath);
-        FileInputFormat.addInputPath(mergeJob, tempDir);
-        mergeJob.setReducerClass(StatusUpdateReducer.class);
-
-        try {
-            JobClient.runJob(mergeJob);
-        } catch (final Exception e) {
-            LOG.error("DeduplicationMergeJob: "
-                    + StringUtils.stringifyException(e));
-            return -1;
-        }
+    SimpleDateFormat sdf = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
+    long start = System.currentTimeMillis();
+    LOG.info("DeduplicationJob: starting at " + sdf.format(start));
+
+    Path tempDir = new Path(getConf().get("mapred.temp.dir", ".")
+        + "/dedup-temp-"
+        + Integer.toString(new Random().nextInt(Integer.MAX_VALUE)));
+
+    JobConf job = new NutchJob(getConf());
+
+    job.setJobName("Deduplication on " + crawldb);
+
+    FileInputFormat.addInputPath(job, new Path(crawldb, CrawlDb.CURRENT_NAME));
+    job.setInputFormat(SequenceFileInputFormat.class);
+
+    FileOutputFormat.setOutputPath(job, tempDir);
+    job.setOutputFormat(SequenceFileOutputFormat.class);
+
+    job.setMapOutputKeyClass(BytesWritable.class);
+    job.setMapOutputValueClass(CrawlDatum.class);
+
+    job.setOutputKeyClass(Text.class);
+    job.setOutputValueClass(CrawlDatum.class);
+
+    job.setMapperClass(DBFilter.class);
+    job.setReducerClass(DedupReducer.class);
+
+    try {
+      RunningJob rj = JobClient.runJob(job);
+      Group g = rj.getCounters().getGroup("DeduplicationJobStatus");
+      if (g != null) {
+        long dups = g.getCounter("Documents marked as duplicate");
+        LOG.info("Deduplication: " + (int) dups
+            + " documents marked as duplicates");
+      }
+    } catch (final Exception e) {
+      LOG.error("DeduplicationJob: " + StringUtils.stringifyException(e));
+      return -1;
+    }
 
-        CrawlDb.install(mergeJob, dbPath);
+    // merge with existing crawl db
+    if (LOG.isInfoEnabled()) {
+      LOG.info("Deduplication: Updating status of duplicate urls into crawl 
db.");
+    }
 
-        // clean up
-        FileSystem fs = FileSystem.get(getConf());
-        fs.delete(tempDir, true);
-        
-        long end = System.currentTimeMillis();
-        LOG.info("Deduplication finished at " + sdf.format(end)
-                + ", elapsed: " + TimingUtil.elapsedTime(start, end));
-
-        return 0;
-    }
-
-    public static void main(String[] args) throws Exception {
-        int result = ToolRunner.run(NutchConfiguration.create(),
-                new DeduplicationJob(), args);
-        System.exit(result);
+    Path dbPath = new Path(crawldb);
+    JobConf mergeJob = CrawlDb.createJob(getConf(), dbPath);
+    FileInputFormat.addInputPath(mergeJob, tempDir);
+    mergeJob.setReducerClass(StatusUpdateReducer.class);
+
+    try {
+      JobClient.runJob(mergeJob);
+    } catch (final Exception e) {
+      LOG.error("DeduplicationMergeJob: " + StringUtils.stringifyException(e));
+      return -1;
     }
+
+    CrawlDb.install(mergeJob, dbPath);
+
+    // clean up
+    FileSystem fs = FileSystem.get(getConf());
+    fs.delete(tempDir, true);
+
+    long end = System.currentTimeMillis();
+    LOG.info("Deduplication finished at " + sdf.format(end) + ", elapsed: "
+        + TimingUtil.elapsedTime(start, end));
+
+    return 0;
+  }
+
+  public static void main(String[] args) throws Exception {
+    int result = ToolRunner.run(NutchConfiguration.create(),
+        new DeduplicationJob(), args);
+    System.exit(result);
+  }
 }

Modified: nutch/trunk/src/java/org/apache/nutch/crawl/DefaultFetchSchedule.java
URL: 
http://svn.apache.org/viewvc/nutch/trunk/src/java/org/apache/nutch/crawl/DefaultFetchSchedule.java?rev=1655526&r1=1655525&r2=1655526&view=diff
==============================================================================
--- nutch/trunk/src/java/org/apache/nutch/crawl/DefaultFetchSchedule.java 
(original)
+++ nutch/trunk/src/java/org/apache/nutch/crawl/DefaultFetchSchedule.java Thu 
Jan 29 05:38:59 2015
@@ -20,8 +20,8 @@ package org.apache.nutch.crawl;
 import org.apache.hadoop.io.Text;
 
 /**
- * This class implements the default re-fetch schedule. That is, no matter
- * if the page was changed or not, the <code>fetchInterval</code> remains
+ * This class implements the default re-fetch schedule. That is, no matter if
+ * the page was changed or not, the <code>fetchInterval</code> remains
  * unchanged, and the updated page fetchTime will always be set to
  * <code>fetchTime + fetchInterval * 1000</code>.
  * 
@@ -31,14 +31,14 @@ public class DefaultFetchSchedule extend
 
   @Override
   public CrawlDatum setFetchSchedule(Text url, CrawlDatum datum,
-          long prevFetchTime, long prevModifiedTime,
-          long fetchTime, long modifiedTime, int state) {
+      long prevFetchTime, long prevModifiedTime, long fetchTime,
+      long modifiedTime, int state) {
     datum = super.setFetchSchedule(url, datum, prevFetchTime, prevModifiedTime,
         fetchTime, modifiedTime, state);
-    if (datum.getFetchInterval() == 0 ) {
+    if (datum.getFetchInterval() == 0) {
       datum.setFetchInterval(defaultInterval);
     }
-    datum.setFetchTime(fetchTime + (long)datum.getFetchInterval() * 1000);
+    datum.setFetchTime(fetchTime + (long) datum.getFetchInterval() * 1000);
     datum.setModifiedTime(modifiedTime);
     return datum;
   }

Modified: nutch/trunk/src/java/org/apache/nutch/crawl/FetchSchedule.java
URL: 
http://svn.apache.org/viewvc/nutch/trunk/src/java/org/apache/nutch/crawl/FetchSchedule.java?rev=1655526&r1=1655525&r2=1655526&view=diff
==============================================================================
--- nutch/trunk/src/java/org/apache/nutch/crawl/FetchSchedule.java (original)
+++ nutch/trunk/src/java/org/apache/nutch/crawl/FetchSchedule.java Thu Jan 29 
05:38:59 2015
@@ -21,160 +21,188 @@ import org.apache.hadoop.conf.Configurab
 import org.apache.hadoop.io.Text;
 
 /**
- * This interface defines the contract for implementations that manipulate
- * fetch times and re-fetch intervals.
+ * This interface defines the contract for implementations that manipulate 
fetch
+ * times and re-fetch intervals.
  * 
  * @author Andrzej Bialecki
  */
 public interface FetchSchedule extends Configurable {
-  
+
   /** It is unknown whether page was changed since our last visit. */
-  public static final int STATUS_UNKNOWN       = 0;
+  public static final int STATUS_UNKNOWN = 0;
   /** Page is known to have been modified since our last visit. */
-  public static final int STATUS_MODIFIED      = 1;
+  public static final int STATUS_MODIFIED = 1;
   /** Page is known to remain unmodified since our last visit. */
-  public static final int STATUS_NOTMODIFIED    = 2;
-  
+  public static final int STATUS_NOTMODIFIED = 2;
+
   public static final int SECONDS_PER_DAY = 3600 * 24;
+
   /**
-   * Initialize fetch schedule related data. Implementations should at least
-   * set the <code>fetchTime</code> and <code>fetchInterval</code>. The default
-   * implementation set the <code>fetchTime</code> to now, using the
-   * default <code>fetchInterval</code>.
-   * 
-   * @param url URL of the page.
-   *
-   * @param datum datum instance to be initialized.
-   *
+   * Initialize fetch schedule related data. Implementations should at least 
set
+   * the <code>fetchTime</code> and <code>fetchInterval</code>. The default
+   * implementation set the <code>fetchTime</code> to now, using the default
+   * <code>fetchInterval</code>.
+   * 
+   * @param url
+   *          URL of the page.
+   * 
+   * @param datum
+   *          datum instance to be initialized.
+   * 
    * @return adjusted page information, including all original information.
-   * NOTE: this may be a different instance than @see CrawlDatum, but
-   * implementations should make sure that it contains at least all
-   * information from @see CrawlDatum.
+   *         NOTE: this may be a different instance than @see CrawlDatum, but
+   *         implementations should make sure that it contains at least all
+   *         information from @see CrawlDatum.
    */
   public CrawlDatum initializeSchedule(Text url, CrawlDatum datum);
-  
+
   /**
    * Sets the <code>fetchInterval</code> and <code>fetchTime</code> on a
-   * successfully fetched page.
-   * Implementations may use supplied arguments to support different 
re-fetching
-   * schedules.
-   * 
-   * @param url url of the page
-   *
-   * @param datum page description to be adjusted. NOTE: this instance, passed 
by reference,
-   * may be modified inside the method.
-   *
-   * @param prevFetchTime previous value of fetch time, or 0 if not available.
-   *
-   * @param prevModifiedTime previous value of modifiedTime, or 0 if not 
available.
-   *
-   * @param fetchTime the latest time, when the page was recently re-fetched. 
Most FetchSchedule
-   * implementations should update the value in @see CrawlDatum to something 
greater than this value.
-   *
-   * @param modifiedTime last time the content was modified. This information 
comes from
-   * the protocol implementations, or is set to < 0 if not available. Most 
FetchSchedule
-   * implementations should update the value in @see CrawlDatum to this value.
-   *
-   * @param state if {@link #STATUS_MODIFIED}, then the content is considered 
to be "changed" before the
-   * <code>fetchTime</code>, if {@link #STATUS_NOTMODIFIED} then the content 
is known to be unchanged.
-   * This information may be obtained by comparing page signatures before and 
after fetching. If this
-   * is set to {@link #STATUS_UNKNOWN}, then it is unknown whether the page 
was changed; implementations
-   * are free to follow a sensible default behavior.
-   *
-   * @return adjusted page information, including all original information. 
NOTE: this may
-   * be a different instance than @see CrawlDatum, but implementations should 
make sure that
-   * it contains at least all information from @see CrawlDatum}.
+   * successfully fetched page. Implementations may use supplied arguments to
+   * support different re-fetching schedules.
+   * 
+   * @param url
+   *          url of the page
+   * 
+   * @param datum
+   *          page description to be adjusted. NOTE: this instance, passed by
+   *          reference, may be modified inside the method.
+   * 
+   * @param prevFetchTime
+   *          previous value of fetch time, or 0 if not available.
+   * 
+   * @param prevModifiedTime
+   *          previous value of modifiedTime, or 0 if not available.
+   * 
+   * @param fetchTime
+   *          the latest time, when the page was recently re-fetched. Most
+   *          FetchSchedule implementations should update the value in @see
+   *          CrawlDatum to something greater than this value.
+   * 
+   * @param modifiedTime
+   *          last time the content was modified. This information comes from
+   *          the protocol implementations, or is set to < 0 if not available.
+   *          Most FetchSchedule implementations should update the value in 
@see
+   *          CrawlDatum to this value.
+   * 
+   * @param state
+   *          if {@link #STATUS_MODIFIED}, then the content is considered to be
+   *          "changed" before the <code>fetchTime</code>, if
+   *          {@link #STATUS_NOTMODIFIED} then the content is known to be
+   *          unchanged. This information may be obtained by comparing page
+   *          signatures before and after fetching. If this is set to
+   *          {@link #STATUS_UNKNOWN}, then it is unknown whether the page was
+   *          changed; implementations are free to follow a sensible default
+   *          behavior.
+   * 
+   * @return adjusted page information, including all original information.
+   *         NOTE: this may be a different instance than @see CrawlDatum, but
+   *         implementations should make sure that it contains at least all
+   *         information from @see CrawlDatum}.
    */
   public CrawlDatum setFetchSchedule(Text url, CrawlDatum datum,
-          long prevFetchTime, long prevModifiedTime,
-          long fetchTime, long modifiedTime, int state);
-  
-  /**
-   * This method specifies how to schedule refetching of pages
-   * marked as GONE. Default implementation increases fetchInterval by 50%,
-   * and if it exceeds the <code>maxInterval</code> it calls
+      long prevFetchTime, long prevModifiedTime, long fetchTime,
+      long modifiedTime, int state);
+
+  /**
+   * This method specifies how to schedule refetching of pages marked as GONE.
+   * Default implementation increases fetchInterval by 50%, and if it exceeds
+   * the <code>maxInterval</code> it calls
    * {@link #forceRefetch(Text, CrawlDatum, boolean)}.
-   *
-   * @param url URL of the page
-   *
-   * @param datum datum instance to be adjusted.
-   *
+   * 
+   * @param url
+   *          URL of the page
+   * 
+   * @param datum
+   *          datum instance to be adjusted.
+   * 
    * @return adjusted page information, including all original information.
-   * NOTE: this may be a different instance than @see CrawlDatum, but
-   * implementations should make sure that it contains at least all
-   * information from @see CrawlDatum.
+   *         NOTE: this may be a different instance than @see CrawlDatum, but
+   *         implementations should make sure that it contains at least all
+   *         information from @see CrawlDatum.
    */
   public CrawlDatum setPageGoneSchedule(Text url, CrawlDatum datum,
-          long prevFetchTime, long prevModifiedTime, long fetchTime);
-  
+      long prevFetchTime, long prevModifiedTime, long fetchTime);
+
   /**
-   * This method adjusts the fetch schedule if fetching needs to be
-   * re-tried due to transient errors. The default implementation
-   * sets the next fetch time 1 day in the future and increases the
-   * retry counter.
-   *
-   * @param url URL of the page.
-   *
-   * @param datum page information.
-   *
-   * @param prevFetchTime previous fetch time.
-   *
-   * @param prevModifiedTime previous modified time.
-   *
-   * @param fetchTime current fetch time.
-   *
+   * This method adjusts the fetch schedule if fetching needs to be re-tried 
due
+   * to transient errors. The default implementation sets the next fetch time 1
+   * day in the future and increases the retry counter.
+   * 
+   * @param url
+   *          URL of the page.
+   * 
+   * @param datum
+   *          page information.
+   * 
+   * @param prevFetchTime
+   *          previous fetch time.
+   * 
+   * @param prevModifiedTime
+   *          previous modified time.
+   * 
+   * @param fetchTime
+   *          current fetch time.
+   * 
    * @return adjusted page information, including all original information.
-   * NOTE: this may be a different instance than @see CrawlDatum, but
-   * implementations should make sure that it contains at least all
-   * information from @see CrawlDatum.
+   *         NOTE: this may be a different instance than @see CrawlDatum, but
+   *         implementations should make sure that it contains at least all
+   *         information from @see CrawlDatum.
    */
   public CrawlDatum setPageRetrySchedule(Text url, CrawlDatum datum,
-          long prevFetchTime, long prevModifiedTime, long fetchTime);
-  
+      long prevFetchTime, long prevModifiedTime, long fetchTime);
+
   /**
    * Calculates last fetch time of the given CrawlDatum.
+   * 
    * @return the date as a long.
    */
   public long calculateLastFetchTime(CrawlDatum datum);
 
   /**
-   * This method provides information whether the page is suitable for
-   * selection in the current fetchlist. NOTE: a true return value does not
-   * guarantee that the page will be fetched, it just allows it to be
-   * included in the further selection process based on scores. The default
-   * implementation checks <code>fetchTime</code>, if it is higher than the
-   * curTime it returns false, and true otherwise. It will also
-   * check that fetchTime is not too remote (more than 
<code>maxInterval</code),
-   * in which case it lowers the interval and returns true.
-   *
-   * @param url URL of the page.
-   *
-   * @param datum datum instance.
-   *
-   * @param curTime reference time (usually set to the time when the
-   * fetchlist generation process was started).
-   *
+   * This method provides information whether the page is suitable for 
selection
+   * in the current fetchlist. NOTE: a true return value does not guarantee 
that
+   * the page will be fetched, it just allows it to be included in the further
+   * selection process based on scores. The default implementation checks
+   * <code>fetchTime</code>, if it is higher than the curTime it returns false,
+   * and true otherwise. It will also check that fetchTime is not too remote
+   * (more than <code>maxInterval</code), in which case it lowers the interval
+   * and returns true.
+   * 
+   * @param url
+   *          URL of the page.
+   * 
+   * @param datum
+   *          datum instance.
+   * 
+   * @param curTime
+   *          reference time (usually set to the time when the fetchlist
+   *          generation process was started).
+   * 
    * @return true, if the page should be considered for inclusion in the 
current
-   * fetchlist, otherwise false.
+   *         fetchlist, otherwise false.
    */
   public boolean shouldFetch(Text url, CrawlDatum datum, long curTime);
-  
+
   /**
-   * This method resets fetchTime, fetchInterval, modifiedTime and
-   * page signature, so that it forces refetching.
-   *
-   * @param url URL of the page.
-   *
-   * @param datum datum instance.
-   *
-   * @param asap if true, force refetch as soon as possible - this sets
-   * the fetchTime to now. If false, force refetch whenever the next fetch
-   * time is set.
-   *
+   * This method resets fetchTime, fetchInterval, modifiedTime and page
+   * signature, so that it forces refetching.
+   * 
+   * @param url
+   *          URL of the page.
+   * 
+   * @param datum
+   *          datum instance.
+   * 
+   * @param asap
+   *          if true, force refetch as soon as possible - this sets the
+   *          fetchTime to now. If false, force refetch whenever the next fetch
+   *          time is set.
+   * 
    * @return adjusted page information, including all original information.
-   * NOTE: this may be a different instance than @see CrawlDatum, but
-   * implementations should make sure that it contains at least all
-   * information from @see CrawlDatum.
+   *         NOTE: this may be a different instance than @see CrawlDatum, but
+   *         implementations should make sure that it contains at least all
+   *         information from @see CrawlDatum.
    */
   public CrawlDatum forceRefetch(Text url, CrawlDatum datum, boolean asap);
 }

Modified: nutch/trunk/src/java/org/apache/nutch/crawl/FetchScheduleFactory.java
URL: 
http://svn.apache.org/viewvc/nutch/trunk/src/java/org/apache/nutch/crawl/FetchScheduleFactory.java?rev=1655526&r1=1655525&r2=1655526&view=diff
==============================================================================
--- nutch/trunk/src/java/org/apache/nutch/crawl/FetchScheduleFactory.java 
(original)
+++ nutch/trunk/src/java/org/apache/nutch/crawl/FetchScheduleFactory.java Thu 
Jan 29 05:38:59 2015
@@ -25,20 +25,23 @@ import org.apache.nutch.util.ObjectCache
 /** Creates and caches a {@link FetchSchedule} implementation. */
 public class FetchScheduleFactory {
 
-  public static final Logger LOG = 
LoggerFactory.getLogger(FetchScheduleFactory.class);
+  public static final Logger LOG = LoggerFactory
+      .getLogger(FetchScheduleFactory.class);
 
-  private FetchScheduleFactory() {}                   // no public ctor
+  private FetchScheduleFactory() {
+  } // no public ctor
 
   /** Return the FetchSchedule implementation. */
   public synchronized static FetchSchedule getFetchSchedule(Configuration 
conf) {
-    String clazz = conf.get("db.fetch.schedule.class", 
DefaultFetchSchedule.class.getName());
+    String clazz = conf.get("db.fetch.schedule.class",
+        DefaultFetchSchedule.class.getName());
     ObjectCache objectCache = ObjectCache.get(conf);
-    FetchSchedule impl = (FetchSchedule)objectCache.getObject(clazz);
+    FetchSchedule impl = (FetchSchedule) objectCache.getObject(clazz);
     if (impl == null) {
       try {
         LOG.info("Using FetchSchedule impl: " + clazz);
         Class<?> implClass = Class.forName(clazz);
-        impl = (FetchSchedule)implClass.newInstance();
+        impl = (FetchSchedule) implClass.newInstance();
         impl.setConf(conf);
         objectCache.setObject(clazz, impl);
       } catch (Exception e) {


Reply via email to