Modified: nutch/trunk/src/java/org/apache/nutch/crawl/CrawlDbReader.java URL: http://svn.apache.org/viewvc/nutch/trunk/src/java/org/apache/nutch/crawl/CrawlDbReader.java?rev=1655526&r1=1655525&r2=1655526&view=diff ============================================================================== --- nutch/trunk/src/java/org/apache/nutch/crawl/CrawlDbReader.java (original) +++ nutch/trunk/src/java/org/apache/nutch/crawl/CrawlDbReader.java Thu Jan 29 05:38:59 2015 @@ -30,7 +30,6 @@ import java.util.regex.Matcher; import java.util.regex.Pattern; import java.util.TreeMap; - // Commons Logging imports import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -66,9 +65,9 @@ import org.apache.nutch.util.StringUtil; /** * Read utility for the CrawlDB. - * + * * @author Andrzej Bialecki - * + * */ public class CrawlDbReader implements Closeable { @@ -76,15 +75,18 @@ public class CrawlDbReader implements Cl private MapFile.Reader[] readers = null; - private void openReaders(String crawlDb, Configuration config) throws IOException { - if (readers != null) return; + private void openReaders(String crawlDb, Configuration config) + throws IOException { + if (readers != null) + return; FileSystem fs = FileSystem.get(config); readers = MapFileOutputFormat.getReaders(fs, new Path(crawlDb, CrawlDb.CURRENT_NAME), config); } private void closeReaders() { - if (readers == null) return; + if (readers == null) + return; for (int i = 0; i < readers.length; i++) { try { readers[i].close(); @@ -94,55 +96,61 @@ public class CrawlDbReader implements Cl } } - public static class CrawlDatumCsvOutputFormat extends FileOutputFormat<Text,CrawlDatum> { - protected static class LineRecordWriter implements RecordWriter<Text,CrawlDatum> { + public static class CrawlDatumCsvOutputFormat extends + FileOutputFormat<Text, CrawlDatum> { + protected static class LineRecordWriter implements + RecordWriter<Text, CrawlDatum> { private DataOutputStream out; + public LineRecordWriter(DataOutputStream out) { this.out = out; try { out.writeBytes("Url,Status code,Status name,Fetch Time,Modified Time,Retries since fetch,Retry interval seconds,Retry interval days,Score,Signature,Metadata\n"); - } catch (IOException e) {} + } catch (IOException e) { + } } - public synchronized void write(Text key, CrawlDatum value) throws IOException { - out.writeByte('"'); - out.writeBytes(key.toString()); - out.writeByte('"'); - out.writeByte(','); - out.writeBytes(Integer.toString(value.getStatus())); - out.writeByte(','); - out.writeByte('"'); - out.writeBytes(CrawlDatum.getStatusName(value.getStatus())); - out.writeByte('"'); - out.writeByte(','); - out.writeBytes(new Date(value.getFetchTime()).toString()); - out.writeByte(','); - out.writeBytes(new Date(value.getModifiedTime()).toString()); - out.writeByte(','); - out.writeBytes(Integer.toString(value.getRetriesSinceFetch())); - out.writeByte(','); - out.writeBytes(Float.toString(value.getFetchInterval())); - out.writeByte(','); - out.writeBytes(Float.toString((value.getFetchInterval() / FetchSchedule.SECONDS_PER_DAY))); - out.writeByte(','); - out.writeBytes(Float.toString(value.getScore())); - out.writeByte(','); - out.writeByte('"'); - out.writeBytes(value.getSignature() != null ? StringUtil.toHexString(value.getSignature()): "null"); - out.writeByte('"'); - out.writeByte(','); - out.writeByte('"'); - if (value.getMetaData() != null) { - for (Entry<Writable, Writable> e : value.getMetaData().entrySet()) { - out.writeBytes(e.getKey().toString()); - out.writeByte(':'); - out.writeBytes(e.getValue().toString()); - out.writeBytes("|||"); - } + public synchronized void write(Text key, CrawlDatum value) + throws IOException { + out.writeByte('"'); + out.writeBytes(key.toString()); + out.writeByte('"'); + out.writeByte(','); + out.writeBytes(Integer.toString(value.getStatus())); + out.writeByte(','); + out.writeByte('"'); + out.writeBytes(CrawlDatum.getStatusName(value.getStatus())); + out.writeByte('"'); + out.writeByte(','); + out.writeBytes(new Date(value.getFetchTime()).toString()); + out.writeByte(','); + out.writeBytes(new Date(value.getModifiedTime()).toString()); + out.writeByte(','); + out.writeBytes(Integer.toString(value.getRetriesSinceFetch())); + out.writeByte(','); + out.writeBytes(Float.toString(value.getFetchInterval())); + out.writeByte(','); + out.writeBytes(Float.toString((value.getFetchInterval() / FetchSchedule.SECONDS_PER_DAY))); + out.writeByte(','); + out.writeBytes(Float.toString(value.getScore())); + out.writeByte(','); + out.writeByte('"'); + out.writeBytes(value.getSignature() != null ? StringUtil + .toHexString(value.getSignature()) : "null"); + out.writeByte('"'); + out.writeByte(','); + out.writeByte('"'); + if (value.getMetaData() != null) { + for (Entry<Writable, Writable> e : value.getMetaData().entrySet()) { + out.writeBytes(e.getKey().toString()); + out.writeByte(':'); + out.writeBytes(e.getValue().toString()); + out.writeBytes("|||"); } - out.writeByte('"'); + } + out.writeByte('"'); - out.writeByte('\n'); + out.writeByte('\n'); } public synchronized void close(Reporter reporter) throws IOException { @@ -150,42 +158,59 @@ public class CrawlDbReader implements Cl } } - public RecordWriter<Text,CrawlDatum> getRecordWriter(FileSystem fs, JobConf job, String name, - Progressable progress) throws IOException { + public RecordWriter<Text, CrawlDatum> getRecordWriter(FileSystem fs, + JobConf job, String name, Progressable progress) throws IOException { Path dir = FileOutputFormat.getOutputPath(job); DataOutputStream fileOut = fs.create(new Path(dir, name), progress); return new LineRecordWriter(fileOut); - } + } } - public static class CrawlDbStatMapper implements Mapper<Text, CrawlDatum, Text, LongWritable> { + public static class CrawlDbStatMapper implements + Mapper<Text, CrawlDatum, Text, LongWritable> { LongWritable COUNT_1 = new LongWritable(1); private boolean sort = false; + public void configure(JobConf job) { - sort = job.getBoolean("db.reader.stats.sort", false ); + sort = job.getBoolean("db.reader.stats.sort", false); } - public void close() {} - public void map(Text key, CrawlDatum value, OutputCollector<Text, LongWritable> output, Reporter reporter) - throws IOException { + + public void close() { + } + + public void map(Text key, CrawlDatum value, + OutputCollector<Text, LongWritable> output, Reporter reporter) + throws IOException { output.collect(new Text("T"), COUNT_1); output.collect(new Text("status " + value.getStatus()), COUNT_1); - output.collect(new Text("retry " + value.getRetriesSinceFetch()), COUNT_1); - output.collect(new Text("s"), new LongWritable((long) (value.getScore() * 1000.0))); - if(sort){ + output + .collect(new Text("retry " + value.getRetriesSinceFetch()), COUNT_1); + output.collect(new Text("s"), new LongWritable( + (long) (value.getScore() * 1000.0))); + if (sort) { URL u = new URL(key.toString()); String host = u.getHost(); - output.collect(new Text("status " + value.getStatus() + " " + host), COUNT_1); + output.collect(new Text("status " + value.getStatus() + " " + host), + COUNT_1); } } } - public static class CrawlDbStatCombiner implements Reducer<Text, LongWritable, Text, LongWritable> { + public static class CrawlDbStatCombiner implements + Reducer<Text, LongWritable, Text, LongWritable> { LongWritable val = new LongWritable(); - public CrawlDbStatCombiner() { } - public void configure(JobConf job) { } - public void close() {} - public void reduce(Text key, Iterator<LongWritable> values, OutputCollector<Text, LongWritable> output, Reporter reporter) + public CrawlDbStatCombiner() { + } + + public void configure(JobConf job) { + } + + public void close() { + } + + public void reduce(Text key, Iterator<LongWritable> values, + OutputCollector<Text, LongWritable> output, Reporter reporter) throws IOException { val.set(0L); String k = key.toString(); @@ -201,8 +226,10 @@ public class CrawlDbReader implements Cl long max = Long.MIN_VALUE; while (values.hasNext()) { LongWritable cnt = values.next(); - if (cnt.get() < min) min = cnt.get(); - if (cnt.get() > max) max = cnt.get(); + if (cnt.get() < min) + min = cnt.get(); + if (cnt.get() > max) + max = cnt.get(); total += cnt.get(); } output.collect(new Text("scn"), new LongWritable(min)); @@ -212,11 +239,17 @@ public class CrawlDbReader implements Cl } } - public static class CrawlDbStatReducer implements Reducer<Text, LongWritable, Text, LongWritable> { - public void configure(JobConf job) {} - public void close() {} - public void reduce(Text key, Iterator<LongWritable> values, OutputCollector<Text, LongWritable> output, Reporter reporter) - throws IOException { + public static class CrawlDbStatReducer implements + Reducer<Text, LongWritable, Text, LongWritable> { + public void configure(JobConf job) { + } + + public void close() { + } + + public void reduce(Text key, Iterator<LongWritable> values, + OutputCollector<Text, LongWritable> output, Reporter reporter) + throws IOException { String k = key.toString(); if (k.equals("T")) { @@ -238,14 +271,16 @@ public class CrawlDbReader implements Cl LongWritable cnt = new LongWritable(Long.MIN_VALUE); while (values.hasNext()) { LongWritable val = values.next(); - if (cnt.get() < val.get()) cnt.set(val.get()); + if (cnt.get() < val.get()) + cnt.set(val.get()); } output.collect(key, cnt); } else if (k.equals("scn")) { LongWritable cnt = new LongWritable(Long.MAX_VALUE); while (values.hasNext()) { LongWritable val = values.next(); - if (cnt.get() > val.get()) cnt.set(val.get()); + if (cnt.get() > val.get()) + cnt.set(val.get()); } output.collect(key, cnt); } else if (k.equals("sct")) { @@ -259,27 +294,36 @@ public class CrawlDbReader implements Cl } } - public static class CrawlDbTopNMapper implements Mapper<Text, CrawlDatum, FloatWritable, Text> { + public static class CrawlDbTopNMapper implements + Mapper<Text, CrawlDatum, FloatWritable, Text> { private static final FloatWritable fw = new FloatWritable(); private float min = 0.0f; public void configure(JobConf job) { min = job.getFloat("db.reader.topn.min", 0.0f); } - public void close() {} - public void map(Text key, CrawlDatum value, OutputCollector<FloatWritable, Text> output, Reporter reporter) - throws IOException { - if (value.getScore() < min) return; // don't collect low-scoring records + + public void close() { + } + + public void map(Text key, CrawlDatum value, + OutputCollector<FloatWritable, Text> output, Reporter reporter) + throws IOException { + if (value.getScore() < min) + return; // don't collect low-scoring records fw.set(-value.getScore()); // reverse sorting order output.collect(fw, key); // invert mapping: score -> url } } - public static class CrawlDbTopNReducer implements Reducer<FloatWritable, Text, FloatWritable, Text> { + public static class CrawlDbTopNReducer implements + Reducer<FloatWritable, Text, FloatWritable, Text> { private long topN; private long count = 0L; - public void reduce(FloatWritable key, Iterator<Text> values, OutputCollector<FloatWritable, Text> output, Reporter reporter) throws IOException { + public void reduce(FloatWritable key, Iterator<Text> values, + OutputCollector<FloatWritable, Text> output, Reporter reporter) + throws IOException { while (values.hasNext() && count < topN) { key.set(-key.get()); output.collect(key, values.next()); @@ -291,14 +335,16 @@ public class CrawlDbReader implements Cl topN = job.getLong("db.reader.topn", 100) / job.getNumReduceTasks(); } - public void close() {} + public void close() { + } } public void close() { closeReaders(); } - public void processStatJob(String crawlDb, Configuration config, boolean sort) throws IOException { + public void processStatJob(String crawlDb, Configuration config, boolean sort) + throws IOException { if (LOG.isInfoEnabled()) { LOG.info("CrawlDb statistics start: " + crawlDb); @@ -329,7 +375,8 @@ public class CrawlDbReader implements Cl // reading the result FileSystem fileSystem = FileSystem.get(config); - SequenceFile.Reader[] readers = SequenceFileOutputFormat.getReaders(config, tmpFolder); + SequenceFile.Reader[] readers = SequenceFileOutputFormat.getReaders(config, + tmpFolder); Text key = new Text(); LongWritable value = new LongWritable(); @@ -342,14 +389,18 @@ public class CrawlDbReader implements Cl LongWritable val = stats.get(k); if (val == null) { val = new LongWritable(); - if (k.equals("scx")) val.set(Long.MIN_VALUE); - if (k.equals("scn")) val.set(Long.MAX_VALUE); + if (k.equals("scx")) + val.set(Long.MIN_VALUE); + if (k.equals("scn")) + val.set(Long.MAX_VALUE); stats.put(k, val); } if (k.equals("scx")) { - if (val.get() < value.get()) val.set(value.get()); + if (val.get() < value.get()) + val.set(value.get()); } else if (k.equals("scn")) { - if (val.get() > value.get()) val.set(value.get()); + if (val.get() > value.get()) + val.set(value.get()); } else { val.set(val.get() + value.get()); } @@ -370,31 +421,40 @@ public class CrawlDbReader implements Cl } else if (k.equals("scx")) { LOG.info("max score:\t" + (val.get() / 1000.0f)); } else if (k.equals("sct")) { - LOG.info("avg score:\t" + (float) ((((double)val.get()) / totalCnt.get()) / 1000.0)); + LOG.info("avg score:\t" + + (float) ((((double) val.get()) / totalCnt.get()) / 1000.0)); } else if (k.startsWith("status")) { String[] st = k.split(" "); int code = Integer.parseInt(st[1]); - if(st.length >2 ) LOG.info(" " + st[2] +" :\t" + val); - else LOG.info(st[0] +" " +code + " (" + CrawlDatum.getStatusName((byte) code) + "):\t" + val); - } else LOG.info(k + ":\t" + val); + if (st.length > 2) + LOG.info(" " + st[2] + " :\t" + val); + else + LOG.info(st[0] + " " + code + " (" + + CrawlDatum.getStatusName((byte) code) + "):\t" + val); + } else + LOG.info(k + ":\t" + val); } } // removing the tmp folder fileSystem.delete(tmpFolder, true); - if (LOG.isInfoEnabled()) { LOG.info("CrawlDb statistics: done"); } + if (LOG.isInfoEnabled()) { + LOG.info("CrawlDb statistics: done"); + } } - public CrawlDatum get(String crawlDb, String url, Configuration config) throws IOException { + public CrawlDatum get(String crawlDb, String url, Configuration config) + throws IOException { Text key = new Text(url); CrawlDatum val = new CrawlDatum(); openReaders(crawlDb, config); - CrawlDatum res = (CrawlDatum)MapFileOutputFormat.getEntry(readers, + CrawlDatum res = (CrawlDatum) MapFileOutputFormat.getEntry(readers, new HashPartitioner<Text, CrawlDatum>(), key, val); return res; } - public void readUrl(String crawlDb, String url, Configuration config) throws IOException { + public void readUrl(String crawlDb, String url, Configuration config) + throws IOException { CrawlDatum res = get(crawlDb, url, config); System.out.println("URL: " + url); if (res != null) { @@ -404,7 +464,9 @@ public class CrawlDbReader implements Cl } } - public void processDumpJob(String crawlDb, String output, Configuration config, String format, String regex, String status, Integer retry) throws IOException { + public void processDumpJob(String crawlDb, String output, + Configuration config, String format, String regex, String status, + Integer retry) throws IOException { if (LOG.isInfoEnabled()) { LOG.info("CrawlDb dump: starting"); LOG.info("CrawlDb db: " + crawlDb); @@ -421,26 +483,31 @@ public class CrawlDbReader implements Cl if (format.equals("csv")) { job.setOutputFormat(CrawlDatumCsvOutputFormat.class); - } - else if (format.equals("crawldb")) { + } else if (format.equals("crawldb")) { job.setOutputFormat(MapFileOutputFormat.class); } else { job.setOutputFormat(TextOutputFormat.class); } - if (status != null) job.set("status", status); - if (regex != null) job.set("regex", regex); - if (retry != null) job.setInt("retry", retry); - + if (status != null) + job.set("status", status); + if (regex != null) + job.set("regex", regex); + if (retry != null) + job.setInt("retry", retry); + job.setMapperClass(CrawlDbDumpMapper.class); job.setOutputKeyClass(Text.class); job.setOutputValueClass(CrawlDatum.class); JobClient.runJob(job); - if (LOG.isInfoEnabled()) { LOG.info("CrawlDb dump: done"); } + if (LOG.isInfoEnabled()) { + LOG.info("CrawlDb dump: done"); + } } - public static class CrawlDbDumpMapper implements Mapper<Text, CrawlDatum, Text, CrawlDatum> { + public static class CrawlDbDumpMapper implements + Mapper<Text, CrawlDatum, Text, CrawlDatum> { Pattern pattern = null; Matcher matcher = null; String status = null; @@ -454,10 +521,13 @@ public class CrawlDbReader implements Cl retry = job.getInt("retry", -1); } - public void close() {} - public void map(Text key, CrawlDatum value, OutputCollector<Text, CrawlDatum> output, Reporter reporter) - throws IOException { - + public void close() { + } + + public void map(Text key, CrawlDatum value, + OutputCollector<Text, CrawlDatum> output, Reporter reporter) + throws IOException { + // check retry if (retry != -1) { if (value.getRetriesSinceFetch() < retry) { @@ -467,7 +537,9 @@ public class CrawlDbReader implements Cl // check status if (status != null - && !status.equalsIgnoreCase(CrawlDatum.getStatusName(value.getStatus()))) return; + && !status.equalsIgnoreCase(CrawlDatum.getStatusName(value + .getStatus()))) + return; // check regex if (pattern != null) { @@ -481,7 +553,8 @@ public class CrawlDbReader implements Cl } } - public void processTopNJob(String crawlDb, long topN, float min, String output, Configuration config) throws IOException { + public void processTopNJob(String crawlDb, long topN, float min, + String output, Configuration config) throws IOException { if (LOG.isInfoEnabled()) { LOG.info("CrawlDb topN: starting (topN=" + topN + ", min=" + min + ")"); @@ -489,10 +562,9 @@ public class CrawlDbReader implements Cl } Path outFolder = new Path(output); - Path tempDir = - new Path(config.get("mapred.temp.dir", ".") + - "/readdb-topN-temp-"+ - Integer.toString(new Random().nextInt(Integer.MAX_VALUE))); + Path tempDir = new Path(config.get("mapred.temp.dir", ".") + + "/readdb-topN-temp-" + + Integer.toString(new Random().nextInt(Integer.MAX_VALUE))); JobConf job = new NutchJob(config); job.setJobName("topN prepare " + crawlDb); @@ -531,7 +603,9 @@ public class CrawlDbReader implements Cl JobClient.runJob(job); FileSystem fs = FileSystem.get(config); fs.delete(tempDir, true); - if (LOG.isInfoEnabled()) { LOG.info("CrawlDb topN: done"); } + if (LOG.isInfoEnabled()) { + LOG.info("CrawlDb topN: done"); + } } @@ -540,20 +614,29 @@ public class CrawlDbReader implements Cl CrawlDbReader dbr = new CrawlDbReader(); if (args.length < 2) { - System.err.println("Usage: CrawlDbReader <crawldb> (-stats | -dump <out_dir> | -topN <nnnn> <out_dir> [<min>] | -url <url>)"); - System.err.println("\t<crawldb>\tdirectory name where crawldb is located"); - System.err.println("\t-stats [-sort] \tprint overall statistics to System.out"); + System.err + .println("Usage: CrawlDbReader <crawldb> (-stats | -dump <out_dir> | -topN <nnnn> <out_dir> [<min>] | -url <url>)"); + System.err + .println("\t<crawldb>\tdirectory name where crawldb is located"); + System.err + .println("\t-stats [-sort] \tprint overall statistics to System.out"); System.err.println("\t\t[-sort]\tlist status sorted by host"); - System.err.println("\t-dump <out_dir> [-format normal|csv|crawldb]\tdump the whole db to a text file in <out_dir>"); + System.err + .println("\t-dump <out_dir> [-format normal|csv|crawldb]\tdump the whole db to a text file in <out_dir>"); System.err.println("\t\t[-format csv]\tdump in Csv format"); - System.err.println("\t\t[-format normal]\tdump in standard format (default option)"); + System.err + .println("\t\t[-format normal]\tdump in standard format (default option)"); System.err.println("\t\t[-format crawldb]\tdump as CrawlDB"); System.err.println("\t\t[-regex <expr>]\tfilter records with expression"); System.err.println("\t\t[-retry <num>]\tminimum retry count"); - System.err.println("\t\t[-status <status>]\tfilter records by CrawlDatum status"); - System.err.println("\t-url <url>\tprint information on <url> to System.out"); - System.err.println("\t-topN <nnnn> <out_dir> [<min>]\tdump top <nnnn> urls sorted by score to <out_dir>"); - System.err.println("\t\t[<min>]\tskip records with scores below this value."); + System.err + .println("\t\t[-status <status>]\tfilter records by CrawlDatum status"); + System.err + .println("\t-url <url>\tprint information on <url> to System.out"); + System.err + .println("\t-topN <nnnn> <out_dir> [<min>]\tdump top <nnnn> urls sorted by score to <out_dir>"); + System.err + .println("\t\t[<min>]\tskip records with scores below this value."); System.err.println("\t\t\tThis can significantly improve performance."); return; } @@ -563,7 +646,7 @@ public class CrawlDbReader implements Cl for (int i = 1; i < args.length; i++) { if (args[i].equals("-stats")) { boolean toSort = false; - if(i < args.length - 1 && "-sort".equals(args[i+1])){ + if (i < args.length - 1 && "-sort".equals(args[i + 1])) { toSort = true; i++; } @@ -577,19 +660,19 @@ public class CrawlDbReader implements Cl for (int j = i + 1; j < args.length; j++) { if (args[j].equals("-format")) { format = args[++j]; - i=i+2; + i = i + 2; } if (args[j].equals("-regex")) { regex = args[++j]; - i=i+2; + i = i + 2; } if (args[j].equals("-retry")) { retry = Integer.parseInt(args[++j]); - i=i+2; + i = i + 2; } if (args[j].equals("-status")) { status = args[++j]; - i=i+2; + i = i + 2; } } dbr.processDumpJob(crawlDb, param, conf, format, regex, status, retry);
Modified: nutch/trunk/src/java/org/apache/nutch/crawl/CrawlDbReducer.java URL: http://svn.apache.org/viewvc/nutch/trunk/src/java/org/apache/nutch/crawl/CrawlDbReducer.java?rev=1655526&r1=1655525&r2=1655526&view=diff ============================================================================== --- nutch/trunk/src/java/org/apache/nutch/crawl/CrawlDbReducer.java (original) +++ nutch/trunk/src/java/org/apache/nutch/crawl/CrawlDbReducer.java Thu Jan 29 05:38:59 2015 @@ -35,9 +35,11 @@ import org.apache.nutch.scoring.ScoringF import org.apache.nutch.scoring.ScoringFilters; /** Merge new page entries with existing entries. */ -public class CrawlDbReducer implements Reducer<Text, CrawlDatum, Text, CrawlDatum> { - public static final Logger LOG = LoggerFactory.getLogger(CrawlDbReducer.class); - +public class CrawlDbReducer implements + Reducer<Text, CrawlDatum, Text, CrawlDatum> { + public static final Logger LOG = LoggerFactory + .getLogger(CrawlDbReducer.class); + private int retryMax; private CrawlDatum result = new CrawlDatum(); private InlinkPriorityQueue linked = null; @@ -50,17 +52,18 @@ public class CrawlDbReducer implements R retryMax = job.getInt("db.fetch.retry.max", 3); scfilters = new ScoringFilters(job); additionsAllowed = job.getBoolean(CrawlDb.CRAWLDB_ADDITIONS_ALLOWED, true); - maxInterval = job.getInt("db.fetch.interval.max", 0 ); + maxInterval = job.getInt("db.fetch.interval.max", 0); schedule = FetchScheduleFactory.getFetchSchedule(job); int maxLinks = job.getInt("db.update.max.inlinks", 10000); linked = new InlinkPriorityQueue(maxLinks); } - public void close() {} + public void close() { + } public void reduce(Text key, Iterator<CrawlDatum> values, - OutputCollector<Text, CrawlDatum> output, Reporter reporter) - throws IOException { + OutputCollector<Text, CrawlDatum> output, Reporter reporter) + throws IOException { CrawlDatum fetch = new CrawlDatum(); CrawlDatum old = new CrawlDatum(); @@ -71,10 +74,11 @@ public class CrawlDbReducer implements R boolean multiple = false; // avoid deep copy when only single value exists linked.clear(); org.apache.hadoop.io.MapWritable metaFromParse = null; - + while (values.hasNext()) { CrawlDatum datum = values.next(); - if (!multiple && values.hasNext()) multiple = true; + if (!multiple && values.hasNext()) + multiple = true; if (CrawlDatum.hasDbStatus(datum)) { if (!oldSet) { if (multiple) { @@ -86,7 +90,8 @@ public class CrawlDbReducer implements R oldSet = true; } else { // always take the latest version - if (old.getFetchTime() < datum.getFetchTime()) old.set(datum); + if (old.getFetchTime() < datum.getFetchTime()) + old.set(datum); } continue; } @@ -101,12 +106,13 @@ public class CrawlDbReducer implements R fetchSet = true; } else { // always take the latest version - if (fetch.getFetchTime() < datum.getFetchTime()) fetch.set(datum); + if (fetch.getFetchTime() < datum.getFetchTime()) + fetch.set(datum); } continue; } - switch (datum.getStatus()) { // collect other info + switch (datum.getStatus()) { // collect other info case CrawlDatum.STATUS_LINKED: CrawlDatum link; if (multiple) { @@ -127,7 +133,7 @@ public class CrawlDbReducer implements R LOG.warn("Unknown status, key: " + key + ", datum: " + datum); } } - + // copy the content of the queue into a List // in reversed order int numLinks = linked.size(); @@ -135,28 +141,31 @@ public class CrawlDbReducer implements R for (int i = numLinks - 1; i >= 0; i--) { linkList.add(linked.pop()); } - + // if it doesn't already exist, skip it - if (!oldSet && !additionsAllowed) return; - + if (!oldSet && !additionsAllowed) + return; + // if there is no fetched datum, perhaps there is a link if (!fetchSet && linkList.size() > 0) { fetch = linkList.get(0); fetchSet = true; } - + // still no new data - record only unchanged old data, if exists, and return if (!fetchSet) { if (oldSet) {// at this point at least "old" should be present output.collect(key, old); - reporter.getCounter("CrawlDB status", CrawlDatum.getStatusName(old.getStatus())).increment(1); + reporter.getCounter("CrawlDB status", + CrawlDatum.getStatusName(old.getStatus())).increment(1); } else { LOG.warn("Missing fetch and old value, signature=" + signature); } return; } - - if (signature == null) signature = fetch.getSignature(); + + if (signature == null) + signature = fetch.getSignature(); long prevModifiedTime = oldSet ? old.getModifiedTime() : 0L; long prevFetchTime = oldSet ? old.getFetchTime() : 0L; @@ -175,12 +184,12 @@ public class CrawlDbReducer implements R result.setModifiedTime(old.getModifiedTime()); } } - - switch (fetch.getStatus()) { // determine new status - case CrawlDatum.STATUS_LINKED: // it was link - if (oldSet) { // if old exists - result.set(old); // use it + switch (fetch.getStatus()) { // determine new status + + case CrawlDatum.STATUS_LINKED: // it was link + if (oldSet) { // if old exists + result.set(old); // use it } else { result = schedule.initializeSchedule(key, result); result.setStatus(CrawlDatum.STATUS_DB_UNFETCHED); @@ -188,18 +197,18 @@ public class CrawlDbReducer implements R scfilters.initialScore(key, result); } catch (ScoringFilterException e) { if (LOG.isWarnEnabled()) { - LOG.warn("Cannot filter init score for url " + key + - ", using default: " + e.getMessage()); + LOG.warn("Cannot filter init score for url " + key + + ", using default: " + e.getMessage()); } result.setScore(0.0f); } } break; - - case CrawlDatum.STATUS_FETCH_SUCCESS: // succesful fetch - case CrawlDatum.STATUS_FETCH_REDIR_TEMP: // successful fetch, redirected + + case CrawlDatum.STATUS_FETCH_SUCCESS: // succesful fetch + case CrawlDatum.STATUS_FETCH_REDIR_TEMP: // successful fetch, redirected case CrawlDatum.STATUS_FETCH_REDIR_PERM: - case CrawlDatum.STATUS_FETCH_NOTMODIFIED: // successful fetch, notmodified + case CrawlDatum.STATUS_FETCH_NOTMODIFIED: // successful fetch, notmodified // determine the modification status int modified = FetchSchedule.STATUS_UNKNOWN; if (fetch.getStatus() == CrawlDatum.STATUS_FETCH_NOTMODIFIED) { @@ -217,15 +226,18 @@ public class CrawlDbReducer implements R } // set the schedule result = schedule.setFetchSchedule(key, result, prevFetchTime, - prevModifiedTime, fetch.getFetchTime(), fetch.getModifiedTime(), modified); + prevModifiedTime, fetch.getFetchTime(), fetch.getModifiedTime(), + modified); // set the result status and signature if (modified == FetchSchedule.STATUS_NOTMODIFIED) { result.setStatus(CrawlDatum.STATUS_DB_NOTMODIFIED); - // NUTCH-1341 The page is not modified according to its signature, let's reset lastModified as well + // NUTCH-1341 The page is not modified according to its signature, let's + // reset lastModified as well result.setModifiedTime(prevModifiedTime); - if (oldSet) result.setSignature(old.getSignature()); + if (oldSet) + result.setSignature(old.getSignature()); } else { switch (fetch.getStatus()) { case CrawlDatum.STATUS_FETCH_SUCCESS: @@ -238,9 +250,12 @@ public class CrawlDbReducer implements R result.setStatus(CrawlDatum.STATUS_DB_REDIR_TEMP); break; default: - LOG.warn("Unexpected status: " + fetch.getStatus() + " resetting to old status."); - if (oldSet) result.setStatus(old.getStatus()); - else result.setStatus(CrawlDatum.STATUS_DB_UNFETCHED); + LOG.warn("Unexpected status: " + fetch.getStatus() + + " resetting to old status."); + if (oldSet) + result.setStatus(old.getStatus()); + else + result.setStatus(CrawlDatum.STATUS_DB_UNFETCHED); } result.setSignature(signature); } @@ -262,11 +277,11 @@ public class CrawlDbReducer implements R case CrawlDatum.STATUS_SIGNATURE: if (LOG.isWarnEnabled()) { LOG.warn("Lone CrawlDatum.STATUS_SIGNATURE: " + key); - } + } return; - case CrawlDatum.STATUS_FETCH_RETRY: // temporary failure + case CrawlDatum.STATUS_FETCH_RETRY: // temporary failure if (oldSet) { - result.setSignature(old.getSignature()); // use old signature + result.setSignature(old.getSignature()); // use old signature } result = schedule.setPageRetrySchedule(key, result, prevFetchTime, prevModifiedTime, fetch.getFetchTime()); @@ -275,20 +290,21 @@ public class CrawlDbReducer implements R } else { result.setStatus(CrawlDatum.STATUS_DB_GONE); result = schedule.setPageGoneSchedule(key, result, prevFetchTime, - prevModifiedTime, fetch.getFetchTime()); + prevModifiedTime, fetch.getFetchTime()); } break; - case CrawlDatum.STATUS_FETCH_GONE: // permanent failure + case CrawlDatum.STATUS_FETCH_GONE: // permanent failure if (oldSet) - result.setSignature(old.getSignature()); // use old signature + result.setSignature(old.getSignature()); // use old signature result.setStatus(CrawlDatum.STATUS_DB_GONE); result = schedule.setPageGoneSchedule(key, result, prevFetchTime, prevModifiedTime, fetch.getFetchTime()); break; default: - throw new RuntimeException("Unknown status: " + fetch.getStatus() + " " + key); + throw new RuntimeException("Unknown status: " + fetch.getStatus() + " " + + key); } try { @@ -301,22 +317,23 @@ public class CrawlDbReducer implements R // remove generation time, if any result.getMetaData().remove(Nutch.WRITABLE_GENERATE_TIME_KEY); output.collect(key, result); - reporter.getCounter("CrawlDB status", CrawlDatum.getStatusName(result.getStatus())).increment(1); + reporter.getCounter("CrawlDB status", + CrawlDatum.getStatusName(result.getStatus())).increment(1); } - + } class InlinkPriorityQueue extends PriorityQueue<CrawlDatum> { - + public InlinkPriorityQueue(int maxSize) { initialize(maxSize); } - + /** Determines the ordering of objects in this priority queue. **/ protected boolean lessThan(Object arg0, Object arg1) { CrawlDatum candidate = (CrawlDatum) arg0; CrawlDatum least = (CrawlDatum) arg1; return candidate.getScore() > least.getScore(); } - + } Modified: nutch/trunk/src/java/org/apache/nutch/crawl/DeduplicationJob.java URL: http://svn.apache.org/viewvc/nutch/trunk/src/java/org/apache/nutch/crawl/DeduplicationJob.java?rev=1655526&r1=1655525&r2=1655526&view=diff ============================================================================== --- nutch/trunk/src/java/org/apache/nutch/crawl/DeduplicationJob.java (original) +++ nutch/trunk/src/java/org/apache/nutch/crawl/DeduplicationJob.java Thu Jan 29 05:38:59 2015 @@ -54,245 +54,244 @@ import org.slf4j.LoggerFactory; * all of them as duplicate except the one with the highest score (based on the * score in the crawldb, which is not necessarily the same as the score * indexed). If two (or more) documents have the same score, then the document - * with the latest timestamp is kept. If the documents have the same timestamp - * then the one with the shortest URL is kept. The documents marked as duplicate can then - * be deleted with the command CleaningJob. + * with the latest timestamp is kept. If the documents have the same timestamp + * then the one with the shortest URL is kept. The documents marked as duplicate + * can then be deleted with the command CleaningJob. ***/ public class DeduplicationJob extends Configured implements Tool { - public static final Logger LOG = LoggerFactory - .getLogger(DeduplicationJob.class); + public static final Logger LOG = LoggerFactory + .getLogger(DeduplicationJob.class); - private final static Text urlKey = new Text("_URLTEMPKEY_"); + private final static Text urlKey = new Text("_URLTEMPKEY_"); - public static class DBFilter implements - Mapper<Text, CrawlDatum, BytesWritable, CrawlDatum> { + public static class DBFilter implements + Mapper<Text, CrawlDatum, BytesWritable, CrawlDatum> { - @Override - public void configure(JobConf arg0) { - } + @Override + public void configure(JobConf arg0) { + } - @Override - public void close() throws IOException { - } + @Override + public void close() throws IOException { + } - @Override - public void map(Text key, CrawlDatum value, - OutputCollector<BytesWritable, CrawlDatum> output, - Reporter reporter) throws IOException { - - if (value.getStatus() == CrawlDatum.STATUS_DB_FETCHED - || value.getStatus() == CrawlDatum.STATUS_DB_NOTMODIFIED) { - // || value.getStatus() ==CrawlDatum.STATUS_DB_GONE){ - byte[] signature = value.getSignature(); - if (signature == null) return; - BytesWritable sig = new BytesWritable(signature); - // add the URL as a temporary MD - value.getMetaData().put(urlKey, key); - // reduce on the signature - output.collect(sig, value); - } - } + @Override + public void map(Text key, CrawlDatum value, + OutputCollector<BytesWritable, CrawlDatum> output, Reporter reporter) + throws IOException { + + if (value.getStatus() == CrawlDatum.STATUS_DB_FETCHED + || value.getStatus() == CrawlDatum.STATUS_DB_NOTMODIFIED) { + // || value.getStatus() ==CrawlDatum.STATUS_DB_GONE){ + byte[] signature = value.getSignature(); + if (signature == null) + return; + BytesWritable sig = new BytesWritable(signature); + // add the URL as a temporary MD + value.getMetaData().put(urlKey, key); + // reduce on the signature + output.collect(sig, value); + } } + } - public static class DedupReducer implements - Reducer<BytesWritable, CrawlDatum, Text, CrawlDatum> { + public static class DedupReducer implements + Reducer<BytesWritable, CrawlDatum, Text, CrawlDatum> { - private void writeOutAsDuplicate(CrawlDatum datum, - OutputCollector<Text, CrawlDatum> output, Reporter reporter) - throws IOException { - datum.setStatus(CrawlDatum.STATUS_DB_DUPLICATE); - Text key = (Text) datum.getMetaData().remove(urlKey); - reporter.incrCounter("DeduplicationJobStatus", - "Documents marked as duplicate", 1); - output.collect(key, datum); - } + private void writeOutAsDuplicate(CrawlDatum datum, + OutputCollector<Text, CrawlDatum> output, Reporter reporter) + throws IOException { + datum.setStatus(CrawlDatum.STATUS_DB_DUPLICATE); + Text key = (Text) datum.getMetaData().remove(urlKey); + reporter.incrCounter("DeduplicationJobStatus", + "Documents marked as duplicate", 1); + output.collect(key, datum); + } - @Override - public void reduce(BytesWritable key, Iterator<CrawlDatum> values, - OutputCollector<Text, CrawlDatum> output, Reporter reporter) - throws IOException { - CrawlDatum existingDoc = null; - - while (values.hasNext()) { - if (existingDoc == null) { - existingDoc = new CrawlDatum(); - existingDoc.set(values.next()); - continue; - } - CrawlDatum newDoc = values.next(); - // compare based on score - if (existingDoc.getScore() < newDoc.getScore()) { - writeOutAsDuplicate(existingDoc, output, reporter); - existingDoc = new CrawlDatum(); - existingDoc.set(newDoc); - continue; - } else if (existingDoc.getScore() > newDoc.getScore()) { - // mark new one as duplicate - writeOutAsDuplicate(newDoc, output, reporter); - continue; - } - // same score? delete the one which is oldest - if (existingDoc.getFetchTime() > newDoc.getFetchTime()) { - // mark new one as duplicate - writeOutAsDuplicate(newDoc, output, reporter); - continue; - } else if (existingDoc.getFetchTime() < newDoc.getFetchTime()) { - // mark existing one as duplicate - writeOutAsDuplicate(existingDoc, output, reporter); - existingDoc = new CrawlDatum(); - existingDoc.set(newDoc); - continue; - } - // same time? keep the one which has the shortest URL - String urlExisting = existingDoc.getMetaData().get(urlKey).toString(); - String urlnewDoc = newDoc.getMetaData().get(urlKey).toString(); - if (urlExisting.length()<urlnewDoc.length()){ - // mark new one as duplicate - writeOutAsDuplicate(newDoc, output, reporter); - continue; - } - else if (urlExisting.length()>urlnewDoc.length()){ - // mark existing one as duplicate - writeOutAsDuplicate(existingDoc, output, reporter); - existingDoc = new CrawlDatum(); - existingDoc.set(newDoc); - continue; - } - } + @Override + public void reduce(BytesWritable key, Iterator<CrawlDatum> values, + OutputCollector<Text, CrawlDatum> output, Reporter reporter) + throws IOException { + CrawlDatum existingDoc = null; + + while (values.hasNext()) { + if (existingDoc == null) { + existingDoc = new CrawlDatum(); + existingDoc.set(values.next()); + continue; + } + CrawlDatum newDoc = values.next(); + // compare based on score + if (existingDoc.getScore() < newDoc.getScore()) { + writeOutAsDuplicate(existingDoc, output, reporter); + existingDoc = new CrawlDatum(); + existingDoc.set(newDoc); + continue; + } else if (existingDoc.getScore() > newDoc.getScore()) { + // mark new one as duplicate + writeOutAsDuplicate(newDoc, output, reporter); + continue; + } + // same score? delete the one which is oldest + if (existingDoc.getFetchTime() > newDoc.getFetchTime()) { + // mark new one as duplicate + writeOutAsDuplicate(newDoc, output, reporter); + continue; + } else if (existingDoc.getFetchTime() < newDoc.getFetchTime()) { + // mark existing one as duplicate + writeOutAsDuplicate(existingDoc, output, reporter); + existingDoc = new CrawlDatum(); + existingDoc.set(newDoc); + continue; + } + // same time? keep the one which has the shortest URL + String urlExisting = existingDoc.getMetaData().get(urlKey).toString(); + String urlnewDoc = newDoc.getMetaData().get(urlKey).toString(); + if (urlExisting.length() < urlnewDoc.length()) { + // mark new one as duplicate + writeOutAsDuplicate(newDoc, output, reporter); + continue; + } else if (urlExisting.length() > urlnewDoc.length()) { + // mark existing one as duplicate + writeOutAsDuplicate(existingDoc, output, reporter); + existingDoc = new CrawlDatum(); + existingDoc.set(newDoc); + continue; } + } + } - @Override - public void configure(JobConf arg0) { - } + @Override + public void configure(JobConf arg0) { + } - @Override - public void close() throws IOException { + @Override + public void close() throws IOException { - } } + } - /** Combine multiple new entries for a url. */ - public static class StatusUpdateReducer implements - Reducer<Text, CrawlDatum, Text, CrawlDatum> { + /** Combine multiple new entries for a url. */ + public static class StatusUpdateReducer implements + Reducer<Text, CrawlDatum, Text, CrawlDatum> { - public void configure(JobConf job) { - } + public void configure(JobConf job) { + } - public void close() { - } + public void close() { + } - private CrawlDatum old = new CrawlDatum(); - private CrawlDatum duplicate = new CrawlDatum(); + private CrawlDatum old = new CrawlDatum(); + private CrawlDatum duplicate = new CrawlDatum(); - public void reduce(Text key, Iterator<CrawlDatum> values, - OutputCollector<Text, CrawlDatum> output, Reporter reporter) - throws IOException { - boolean duplicateSet = false; - - while (values.hasNext()) { - CrawlDatum val = values.next(); - if (val.getStatus() == CrawlDatum.STATUS_DB_DUPLICATE) { - duplicate.set(val); - duplicateSet = true; - } else { - old.set(val); - } - } - - // keep the duplicate if there is one - if (duplicateSet) { - output.collect(key, duplicate); - return; - } + public void reduce(Text key, Iterator<CrawlDatum> values, + OutputCollector<Text, CrawlDatum> output, Reporter reporter) + throws IOException { + boolean duplicateSet = false; + + while (values.hasNext()) { + CrawlDatum val = values.next(); + if (val.getStatus() == CrawlDatum.STATUS_DB_DUPLICATE) { + duplicate.set(val); + duplicateSet = true; + } else { + old.set(val); + } + } + + // keep the duplicate if there is one + if (duplicateSet) { + output.collect(key, duplicate); + return; + } - // no duplicate? keep old one then - output.collect(key, old); - } + // no duplicate? keep old one then + output.collect(key, old); } + } - public int run(String[] args) throws IOException { - if (args.length < 1) { - System.err.println("Usage: DeduplicationJob <crawldb>"); - return 1; - } + public int run(String[] args) throws IOException { + if (args.length < 1) { + System.err.println("Usage: DeduplicationJob <crawldb>"); + return 1; + } - String crawldb = args[0]; + String crawldb = args[0]; - SimpleDateFormat sdf = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss"); - long start = System.currentTimeMillis(); - LOG.info("DeduplicationJob: starting at " + sdf.format(start)); - - Path tempDir = new Path(getConf().get("mapred.temp.dir", ".") - + "/dedup-temp-" - + Integer.toString(new Random().nextInt(Integer.MAX_VALUE))); - - JobConf job = new NutchJob(getConf()); - - job.setJobName("Deduplication on "+crawldb); - - FileInputFormat.addInputPath(job, new Path(crawldb, - CrawlDb.CURRENT_NAME)); - job.setInputFormat(SequenceFileInputFormat.class); - - FileOutputFormat.setOutputPath(job, tempDir); - job.setOutputFormat(SequenceFileOutputFormat.class); - - job.setMapOutputKeyClass(BytesWritable.class); - job.setMapOutputValueClass(CrawlDatum.class); - - job.setOutputKeyClass(Text.class); - job.setOutputValueClass(CrawlDatum.class); - - job.setMapperClass(DBFilter.class); - job.setReducerClass(DedupReducer.class); - - try { - RunningJob rj = JobClient.runJob(job); - Group g = rj.getCounters().getGroup("DeduplicationJobStatus"); - if (g != null){ - long dups = g.getCounter("Documents marked as duplicate"); - LOG.info("Deduplication: "+(int)dups+" documents marked as duplicates"); - } - } catch (final Exception e) { - LOG.error("DeduplicationJob: " + StringUtils.stringifyException(e)); - return -1; - } - - // merge with existing crawl db - if (LOG.isInfoEnabled()) { - LOG.info("Deduplication: Updating status of duplicate urls into crawl db."); - } - - Path dbPath = new Path(crawldb); - JobConf mergeJob = CrawlDb.createJob(getConf(), dbPath); - FileInputFormat.addInputPath(mergeJob, tempDir); - mergeJob.setReducerClass(StatusUpdateReducer.class); - - try { - JobClient.runJob(mergeJob); - } catch (final Exception e) { - LOG.error("DeduplicationMergeJob: " - + StringUtils.stringifyException(e)); - return -1; - } + SimpleDateFormat sdf = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss"); + long start = System.currentTimeMillis(); + LOG.info("DeduplicationJob: starting at " + sdf.format(start)); + + Path tempDir = new Path(getConf().get("mapred.temp.dir", ".") + + "/dedup-temp-" + + Integer.toString(new Random().nextInt(Integer.MAX_VALUE))); + + JobConf job = new NutchJob(getConf()); + + job.setJobName("Deduplication on " + crawldb); + + FileInputFormat.addInputPath(job, new Path(crawldb, CrawlDb.CURRENT_NAME)); + job.setInputFormat(SequenceFileInputFormat.class); + + FileOutputFormat.setOutputPath(job, tempDir); + job.setOutputFormat(SequenceFileOutputFormat.class); + + job.setMapOutputKeyClass(BytesWritable.class); + job.setMapOutputValueClass(CrawlDatum.class); + + job.setOutputKeyClass(Text.class); + job.setOutputValueClass(CrawlDatum.class); + + job.setMapperClass(DBFilter.class); + job.setReducerClass(DedupReducer.class); + + try { + RunningJob rj = JobClient.runJob(job); + Group g = rj.getCounters().getGroup("DeduplicationJobStatus"); + if (g != null) { + long dups = g.getCounter("Documents marked as duplicate"); + LOG.info("Deduplication: " + (int) dups + + " documents marked as duplicates"); + } + } catch (final Exception e) { + LOG.error("DeduplicationJob: " + StringUtils.stringifyException(e)); + return -1; + } - CrawlDb.install(mergeJob, dbPath); + // merge with existing crawl db + if (LOG.isInfoEnabled()) { + LOG.info("Deduplication: Updating status of duplicate urls into crawl db."); + } - // clean up - FileSystem fs = FileSystem.get(getConf()); - fs.delete(tempDir, true); - - long end = System.currentTimeMillis(); - LOG.info("Deduplication finished at " + sdf.format(end) - + ", elapsed: " + TimingUtil.elapsedTime(start, end)); - - return 0; - } - - public static void main(String[] args) throws Exception { - int result = ToolRunner.run(NutchConfiguration.create(), - new DeduplicationJob(), args); - System.exit(result); + Path dbPath = new Path(crawldb); + JobConf mergeJob = CrawlDb.createJob(getConf(), dbPath); + FileInputFormat.addInputPath(mergeJob, tempDir); + mergeJob.setReducerClass(StatusUpdateReducer.class); + + try { + JobClient.runJob(mergeJob); + } catch (final Exception e) { + LOG.error("DeduplicationMergeJob: " + StringUtils.stringifyException(e)); + return -1; } + + CrawlDb.install(mergeJob, dbPath); + + // clean up + FileSystem fs = FileSystem.get(getConf()); + fs.delete(tempDir, true); + + long end = System.currentTimeMillis(); + LOG.info("Deduplication finished at " + sdf.format(end) + ", elapsed: " + + TimingUtil.elapsedTime(start, end)); + + return 0; + } + + public static void main(String[] args) throws Exception { + int result = ToolRunner.run(NutchConfiguration.create(), + new DeduplicationJob(), args); + System.exit(result); + } } Modified: nutch/trunk/src/java/org/apache/nutch/crawl/DefaultFetchSchedule.java URL: http://svn.apache.org/viewvc/nutch/trunk/src/java/org/apache/nutch/crawl/DefaultFetchSchedule.java?rev=1655526&r1=1655525&r2=1655526&view=diff ============================================================================== --- nutch/trunk/src/java/org/apache/nutch/crawl/DefaultFetchSchedule.java (original) +++ nutch/trunk/src/java/org/apache/nutch/crawl/DefaultFetchSchedule.java Thu Jan 29 05:38:59 2015 @@ -20,8 +20,8 @@ package org.apache.nutch.crawl; import org.apache.hadoop.io.Text; /** - * This class implements the default re-fetch schedule. That is, no matter - * if the page was changed or not, the <code>fetchInterval</code> remains + * This class implements the default re-fetch schedule. That is, no matter if + * the page was changed or not, the <code>fetchInterval</code> remains * unchanged, and the updated page fetchTime will always be set to * <code>fetchTime + fetchInterval * 1000</code>. * @@ -31,14 +31,14 @@ public class DefaultFetchSchedule extend @Override public CrawlDatum setFetchSchedule(Text url, CrawlDatum datum, - long prevFetchTime, long prevModifiedTime, - long fetchTime, long modifiedTime, int state) { + long prevFetchTime, long prevModifiedTime, long fetchTime, + long modifiedTime, int state) { datum = super.setFetchSchedule(url, datum, prevFetchTime, prevModifiedTime, fetchTime, modifiedTime, state); - if (datum.getFetchInterval() == 0 ) { + if (datum.getFetchInterval() == 0) { datum.setFetchInterval(defaultInterval); } - datum.setFetchTime(fetchTime + (long)datum.getFetchInterval() * 1000); + datum.setFetchTime(fetchTime + (long) datum.getFetchInterval() * 1000); datum.setModifiedTime(modifiedTime); return datum; } Modified: nutch/trunk/src/java/org/apache/nutch/crawl/FetchSchedule.java URL: http://svn.apache.org/viewvc/nutch/trunk/src/java/org/apache/nutch/crawl/FetchSchedule.java?rev=1655526&r1=1655525&r2=1655526&view=diff ============================================================================== --- nutch/trunk/src/java/org/apache/nutch/crawl/FetchSchedule.java (original) +++ nutch/trunk/src/java/org/apache/nutch/crawl/FetchSchedule.java Thu Jan 29 05:38:59 2015 @@ -21,160 +21,188 @@ import org.apache.hadoop.conf.Configurab import org.apache.hadoop.io.Text; /** - * This interface defines the contract for implementations that manipulate - * fetch times and re-fetch intervals. + * This interface defines the contract for implementations that manipulate fetch + * times and re-fetch intervals. * * @author Andrzej Bialecki */ public interface FetchSchedule extends Configurable { - + /** It is unknown whether page was changed since our last visit. */ - public static final int STATUS_UNKNOWN = 0; + public static final int STATUS_UNKNOWN = 0; /** Page is known to have been modified since our last visit. */ - public static final int STATUS_MODIFIED = 1; + public static final int STATUS_MODIFIED = 1; /** Page is known to remain unmodified since our last visit. */ - public static final int STATUS_NOTMODIFIED = 2; - + public static final int STATUS_NOTMODIFIED = 2; + public static final int SECONDS_PER_DAY = 3600 * 24; + /** - * Initialize fetch schedule related data. Implementations should at least - * set the <code>fetchTime</code> and <code>fetchInterval</code>. The default - * implementation set the <code>fetchTime</code> to now, using the - * default <code>fetchInterval</code>. - * - * @param url URL of the page. - * - * @param datum datum instance to be initialized. - * + * Initialize fetch schedule related data. Implementations should at least set + * the <code>fetchTime</code> and <code>fetchInterval</code>. The default + * implementation set the <code>fetchTime</code> to now, using the default + * <code>fetchInterval</code>. + * + * @param url + * URL of the page. + * + * @param datum + * datum instance to be initialized. + * * @return adjusted page information, including all original information. - * NOTE: this may be a different instance than @see CrawlDatum, but - * implementations should make sure that it contains at least all - * information from @see CrawlDatum. + * NOTE: this may be a different instance than @see CrawlDatum, but + * implementations should make sure that it contains at least all + * information from @see CrawlDatum. */ public CrawlDatum initializeSchedule(Text url, CrawlDatum datum); - + /** * Sets the <code>fetchInterval</code> and <code>fetchTime</code> on a - * successfully fetched page. - * Implementations may use supplied arguments to support different re-fetching - * schedules. - * - * @param url url of the page - * - * @param datum page description to be adjusted. NOTE: this instance, passed by reference, - * may be modified inside the method. - * - * @param prevFetchTime previous value of fetch time, or 0 if not available. - * - * @param prevModifiedTime previous value of modifiedTime, or 0 if not available. - * - * @param fetchTime the latest time, when the page was recently re-fetched. Most FetchSchedule - * implementations should update the value in @see CrawlDatum to something greater than this value. - * - * @param modifiedTime last time the content was modified. This information comes from - * the protocol implementations, or is set to < 0 if not available. Most FetchSchedule - * implementations should update the value in @see CrawlDatum to this value. - * - * @param state if {@link #STATUS_MODIFIED}, then the content is considered to be "changed" before the - * <code>fetchTime</code>, if {@link #STATUS_NOTMODIFIED} then the content is known to be unchanged. - * This information may be obtained by comparing page signatures before and after fetching. If this - * is set to {@link #STATUS_UNKNOWN}, then it is unknown whether the page was changed; implementations - * are free to follow a sensible default behavior. - * - * @return adjusted page information, including all original information. NOTE: this may - * be a different instance than @see CrawlDatum, but implementations should make sure that - * it contains at least all information from @see CrawlDatum}. + * successfully fetched page. Implementations may use supplied arguments to + * support different re-fetching schedules. + * + * @param url + * url of the page + * + * @param datum + * page description to be adjusted. NOTE: this instance, passed by + * reference, may be modified inside the method. + * + * @param prevFetchTime + * previous value of fetch time, or 0 if not available. + * + * @param prevModifiedTime + * previous value of modifiedTime, or 0 if not available. + * + * @param fetchTime + * the latest time, when the page was recently re-fetched. Most + * FetchSchedule implementations should update the value in @see + * CrawlDatum to something greater than this value. + * + * @param modifiedTime + * last time the content was modified. This information comes from + * the protocol implementations, or is set to < 0 if not available. + * Most FetchSchedule implementations should update the value in @see + * CrawlDatum to this value. + * + * @param state + * if {@link #STATUS_MODIFIED}, then the content is considered to be + * "changed" before the <code>fetchTime</code>, if + * {@link #STATUS_NOTMODIFIED} then the content is known to be + * unchanged. This information may be obtained by comparing page + * signatures before and after fetching. If this is set to + * {@link #STATUS_UNKNOWN}, then it is unknown whether the page was + * changed; implementations are free to follow a sensible default + * behavior. + * + * @return adjusted page information, including all original information. + * NOTE: this may be a different instance than @see CrawlDatum, but + * implementations should make sure that it contains at least all + * information from @see CrawlDatum}. */ public CrawlDatum setFetchSchedule(Text url, CrawlDatum datum, - long prevFetchTime, long prevModifiedTime, - long fetchTime, long modifiedTime, int state); - - /** - * This method specifies how to schedule refetching of pages - * marked as GONE. Default implementation increases fetchInterval by 50%, - * and if it exceeds the <code>maxInterval</code> it calls + long prevFetchTime, long prevModifiedTime, long fetchTime, + long modifiedTime, int state); + + /** + * This method specifies how to schedule refetching of pages marked as GONE. + * Default implementation increases fetchInterval by 50%, and if it exceeds + * the <code>maxInterval</code> it calls * {@link #forceRefetch(Text, CrawlDatum, boolean)}. - * - * @param url URL of the page - * - * @param datum datum instance to be adjusted. - * + * + * @param url + * URL of the page + * + * @param datum + * datum instance to be adjusted. + * * @return adjusted page information, including all original information. - * NOTE: this may be a different instance than @see CrawlDatum, but - * implementations should make sure that it contains at least all - * information from @see CrawlDatum. + * NOTE: this may be a different instance than @see CrawlDatum, but + * implementations should make sure that it contains at least all + * information from @see CrawlDatum. */ public CrawlDatum setPageGoneSchedule(Text url, CrawlDatum datum, - long prevFetchTime, long prevModifiedTime, long fetchTime); - + long prevFetchTime, long prevModifiedTime, long fetchTime); + /** - * This method adjusts the fetch schedule if fetching needs to be - * re-tried due to transient errors. The default implementation - * sets the next fetch time 1 day in the future and increases the - * retry counter. - * - * @param url URL of the page. - * - * @param datum page information. - * - * @param prevFetchTime previous fetch time. - * - * @param prevModifiedTime previous modified time. - * - * @param fetchTime current fetch time. - * + * This method adjusts the fetch schedule if fetching needs to be re-tried due + * to transient errors. The default implementation sets the next fetch time 1 + * day in the future and increases the retry counter. + * + * @param url + * URL of the page. + * + * @param datum + * page information. + * + * @param prevFetchTime + * previous fetch time. + * + * @param prevModifiedTime + * previous modified time. + * + * @param fetchTime + * current fetch time. + * * @return adjusted page information, including all original information. - * NOTE: this may be a different instance than @see CrawlDatum, but - * implementations should make sure that it contains at least all - * information from @see CrawlDatum. + * NOTE: this may be a different instance than @see CrawlDatum, but + * implementations should make sure that it contains at least all + * information from @see CrawlDatum. */ public CrawlDatum setPageRetrySchedule(Text url, CrawlDatum datum, - long prevFetchTime, long prevModifiedTime, long fetchTime); - + long prevFetchTime, long prevModifiedTime, long fetchTime); + /** * Calculates last fetch time of the given CrawlDatum. + * * @return the date as a long. */ public long calculateLastFetchTime(CrawlDatum datum); /** - * This method provides information whether the page is suitable for - * selection in the current fetchlist. NOTE: a true return value does not - * guarantee that the page will be fetched, it just allows it to be - * included in the further selection process based on scores. The default - * implementation checks <code>fetchTime</code>, if it is higher than the - * curTime it returns false, and true otherwise. It will also - * check that fetchTime is not too remote (more than <code>maxInterval</code), - * in which case it lowers the interval and returns true. - * - * @param url URL of the page. - * - * @param datum datum instance. - * - * @param curTime reference time (usually set to the time when the - * fetchlist generation process was started). - * + * This method provides information whether the page is suitable for selection + * in the current fetchlist. NOTE: a true return value does not guarantee that + * the page will be fetched, it just allows it to be included in the further + * selection process based on scores. The default implementation checks + * <code>fetchTime</code>, if it is higher than the curTime it returns false, + * and true otherwise. It will also check that fetchTime is not too remote + * (more than <code>maxInterval</code), in which case it lowers the interval + * and returns true. + * + * @param url + * URL of the page. + * + * @param datum + * datum instance. + * + * @param curTime + * reference time (usually set to the time when the fetchlist + * generation process was started). + * * @return true, if the page should be considered for inclusion in the current - * fetchlist, otherwise false. + * fetchlist, otherwise false. */ public boolean shouldFetch(Text url, CrawlDatum datum, long curTime); - + /** - * This method resets fetchTime, fetchInterval, modifiedTime and - * page signature, so that it forces refetching. - * - * @param url URL of the page. - * - * @param datum datum instance. - * - * @param asap if true, force refetch as soon as possible - this sets - * the fetchTime to now. If false, force refetch whenever the next fetch - * time is set. - * + * This method resets fetchTime, fetchInterval, modifiedTime and page + * signature, so that it forces refetching. + * + * @param url + * URL of the page. + * + * @param datum + * datum instance. + * + * @param asap + * if true, force refetch as soon as possible - this sets the + * fetchTime to now. If false, force refetch whenever the next fetch + * time is set. + * * @return adjusted page information, including all original information. - * NOTE: this may be a different instance than @see CrawlDatum, but - * implementations should make sure that it contains at least all - * information from @see CrawlDatum. + * NOTE: this may be a different instance than @see CrawlDatum, but + * implementations should make sure that it contains at least all + * information from @see CrawlDatum. */ public CrawlDatum forceRefetch(Text url, CrawlDatum datum, boolean asap); } Modified: nutch/trunk/src/java/org/apache/nutch/crawl/FetchScheduleFactory.java URL: http://svn.apache.org/viewvc/nutch/trunk/src/java/org/apache/nutch/crawl/FetchScheduleFactory.java?rev=1655526&r1=1655525&r2=1655526&view=diff ============================================================================== --- nutch/trunk/src/java/org/apache/nutch/crawl/FetchScheduleFactory.java (original) +++ nutch/trunk/src/java/org/apache/nutch/crawl/FetchScheduleFactory.java Thu Jan 29 05:38:59 2015 @@ -25,20 +25,23 @@ import org.apache.nutch.util.ObjectCache /** Creates and caches a {@link FetchSchedule} implementation. */ public class FetchScheduleFactory { - public static final Logger LOG = LoggerFactory.getLogger(FetchScheduleFactory.class); + public static final Logger LOG = LoggerFactory + .getLogger(FetchScheduleFactory.class); - private FetchScheduleFactory() {} // no public ctor + private FetchScheduleFactory() { + } // no public ctor /** Return the FetchSchedule implementation. */ public synchronized static FetchSchedule getFetchSchedule(Configuration conf) { - String clazz = conf.get("db.fetch.schedule.class", DefaultFetchSchedule.class.getName()); + String clazz = conf.get("db.fetch.schedule.class", + DefaultFetchSchedule.class.getName()); ObjectCache objectCache = ObjectCache.get(conf); - FetchSchedule impl = (FetchSchedule)objectCache.getObject(clazz); + FetchSchedule impl = (FetchSchedule) objectCache.getObject(clazz); if (impl == null) { try { LOG.info("Using FetchSchedule impl: " + clazz); Class<?> implClass = Class.forName(clazz); - impl = (FetchSchedule)implClass.newInstance(); + impl = (FetchSchedule) implClass.newInstance(); impl.setConf(conf); objectCache.setObject(clazz, impl); } catch (Exception e) {