Author: ab Date: Fri Sep 22 14:39:24 2006 New Revision: 449097 URL: http://svn.apache.org/viewvc?view=rev&rev=449097 Log: Use a CombiningCollector when calculating readdb -stats. This drastically reduces the size of intermediate data, resulting in significant speedups for large databases.
Added: lucene/nutch/branches/branch-0.8/src/java/org/apache/nutch/crawl/CrawlDbReader.java (with props) Modified: lucene/nutch/branches/branch-0.8/CHANGES.txt Modified: lucene/nutch/branches/branch-0.8/CHANGES.txt URL: http://svn.apache.org/viewvc/lucene/nutch/branches/branch-0.8/CHANGES.txt?view=diff&rev=449097&r1=449096&r2=449097 ============================================================================== --- lucene/nutch/branches/branch-0.8/CHANGES.txt (original) +++ lucene/nutch/branches/branch-0.8/CHANGES.txt Fri Sep 22 14:39:24 2006 @@ -24,6 +24,10 @@ 8. NUTCH-105 - Network error during robots.txt fetch causes file to beignored (Greg Kim via siren) + + 9. Use a CombiningCollector when calculating readdb -stats. This + drastically reduces the size of intermediate data, resulting in + significant speed-ups for large databases (ab) Release 0.8 - 2006-07-25 Added: lucene/nutch/branches/branch-0.8/src/java/org/apache/nutch/crawl/CrawlDbReader.java URL: http://svn.apache.org/viewvc/lucene/nutch/branches/branch-0.8/src/java/org/apache/nutch/crawl/CrawlDbReader.java?view=auto&rev=449097 ============================================================================== --- lucene/nutch/branches/branch-0.8/src/java/org/apache/nutch/crawl/CrawlDbReader.java (added) +++ lucene/nutch/branches/branch-0.8/src/java/org/apache/nutch/crawl/CrawlDbReader.java Fri Sep 22 14:39:24 2006 @@ -0,0 +1,462 @@ +/** + * Copyright 2005 The Apache Software Foundation + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.nutch.crawl; + +import java.io.IOException; +import java.util.Iterator; +import java.util.Random; +import java.util.TreeMap; + +// Commons Logging imports +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.io.Closeable; +import org.apache.hadoop.io.FloatWritable; +import org.apache.hadoop.io.LongWritable; +import org.apache.hadoop.io.MapFile; +import org.apache.hadoop.io.SequenceFile; +import org.apache.hadoop.io.UTF8; +import org.apache.hadoop.io.Writable; +import org.apache.hadoop.io.WritableComparable; +import org.apache.hadoop.mapred.JobClient; +import org.apache.hadoop.mapred.JobConf; +import org.apache.hadoop.mapred.MapFileOutputFormat; +import org.apache.hadoop.mapred.Mapper; +import org.apache.hadoop.mapred.OutputCollector; +import org.apache.hadoop.mapred.Reducer; +import org.apache.hadoop.mapred.Reporter; +import org.apache.hadoop.mapred.SequenceFileInputFormat; +import org.apache.hadoop.mapred.SequenceFileOutputFormat; +import org.apache.hadoop.mapred.TextOutputFormat; +import org.apache.hadoop.mapred.lib.HashPartitioner; +import org.apache.hadoop.mapred.lib.IdentityMapper; +import org.apache.hadoop.mapred.lib.IdentityReducer; +import org.apache.nutch.util.NutchConfiguration; +import org.apache.nutch.util.NutchJob; + +/** + * Read utility for the CrawlDB. + * + * @author Andrzej Bialecki + * + */ +public class CrawlDbReader implements Closeable { + + public static final Log LOG = LogFactory.getLog(CrawlDbReader.class); + + private MapFile.Reader[] readers = null; + + private void openReaders(String crawlDb, Configuration config) throws IOException { + if (readers != null) return; + FileSystem fs = FileSystem.get(config); + readers = MapFileOutputFormat.getReaders(fs, new Path(crawlDb, CrawlDatum.DB_DIR_NAME), config); + } + + private void closeReaders() { + if (readers == null) return; + for (int i = 0; i < readers.length; i++) { + try { + readers[i].close(); + } catch (Exception e) { + + } + } + } + + public static class CrawlDbStatMapper implements Mapper { + LongWritable COUNT_1 = new LongWritable(1); + public void configure(JobConf job) {} + public void close() {} + public void map(WritableComparable key, Writable value, OutputCollector output, Reporter reporter) + throws IOException { + CrawlDatum cd = (CrawlDatum) value; + output.collect(new UTF8("T"), COUNT_1); + output.collect(new UTF8("status " + cd.getStatus()), COUNT_1); + output.collect(new UTF8("retry " + cd.getRetriesSinceFetch()), COUNT_1); + output.collect(new UTF8("s"), new LongWritable((long) (cd.getScore() * 1000.0))); + } + } + + public static class CrawlDbStatCombiner implements Reducer { + LongWritable val = new LongWritable(); + + public CrawlDbStatCombiner() { } + public void configure(JobConf job) { } + public void close() {} + public void reduce(WritableComparable key, Iterator values, OutputCollector output, Reporter reporter) + throws IOException { + val.set(0L); + String k = ((UTF8)key).toString(); + if (!k.equals("s")) { + while (values.hasNext()) { + LongWritable cnt = (LongWritable)values.next(); + val.set(val.get() + cnt.get()); + } + output.collect(key, val); + } else { + long total = 0; + long min = Long.MAX_VALUE; + long max = Long.MIN_VALUE; + while (values.hasNext()) { + LongWritable cnt = (LongWritable)values.next(); + if (cnt.get() < min) min = cnt.get(); + if (cnt.get() > max) max = cnt.get(); + total += cnt.get(); + } + output.collect(new UTF8("scn"), new LongWritable(min)); + output.collect(new UTF8("scx"), new LongWritable(max)); + output.collect(new UTF8("sct"), new LongWritable(total)); + } + } + } + + public static class CrawlDbStatReducer implements Reducer { + public void configure(JobConf job) {} + public void close() {} + public void reduce(WritableComparable key, Iterator values, OutputCollector output, Reporter reporter) + throws IOException { + + String k = ((UTF8) key).toString(); + if (k.equals("T")) { + // sum all values for this key + long sum = 0; + while (values.hasNext()) { + sum += ((LongWritable) values.next()).get(); + } + // output sum + output.collect(key, new LongWritable(sum)); + } else if (k.startsWith("status") || k.startsWith("retry")) { + LongWritable cnt = new LongWritable(); + while (values.hasNext()) { + LongWritable val = (LongWritable)values.next(); + cnt.set(cnt.get() + val.get()); + } + output.collect(key, cnt); + } else if (k.equals("scx")) { + LongWritable cnt = new LongWritable(Long.MIN_VALUE); + while (values.hasNext()) { + LongWritable val = (LongWritable)values.next(); + if (cnt.get() < val.get()) cnt.set(val.get()); + } + output.collect(key, cnt); + } else if (k.equals("scn")) { + LongWritable cnt = new LongWritable(Long.MAX_VALUE); + while (values.hasNext()) { + LongWritable val = (LongWritable)values.next(); + if (cnt.get() > val.get()) cnt.set(val.get()); + } + output.collect(key, cnt); + } else if (k.equals("sct")) { + LongWritable cnt = new LongWritable(); + while (values.hasNext()) { + LongWritable val = (LongWritable)values.next(); + cnt.set(cnt.get() + val.get()); + } + output.collect(key, cnt); + } + } + } + + public static class CrawlDbDumpReducer implements Reducer { + + public void reduce(WritableComparable key, Iterator values, OutputCollector output, Reporter reporter) throws IOException { + while (values.hasNext()) { + output.collect(key, (Writable)values.next()); + } + } + + public void configure(JobConf job) {} + public void close() {} + } + + public static class CrawlDbTopNMapper implements Mapper { + private static final FloatWritable fw = new FloatWritable(); + private float min = 0.0f; + + public void configure(JobConf job) { + long lmin = job.getLong("CrawlDbReader.topN.min", 0); + if (lmin != 0) { + min = (float)lmin / 1000000.0f; + } + } + public void close() {} + public void map(WritableComparable key, Writable value, OutputCollector output, Reporter reporter) + throws IOException { + CrawlDatum datum = (CrawlDatum)value; + if (datum.getScore() < min) return; // don't collect low-scoring records + fw.set(-datum.getScore()); // reverse sorting order + output.collect(fw, key); // invert mapping: score -> url + } + } + + public static class CrawlDbTopNReducer implements Reducer { + private long topN; + private long count = 0L; + + public void reduce(WritableComparable key, Iterator values, OutputCollector output, Reporter reporter) throws IOException { + while (values.hasNext() && count < topN) { + FloatWritable fw = (FloatWritable)key; + fw.set(-fw.get()); + output.collect(fw, (Writable)values.next()); + count++; + } + } + + public void configure(JobConf job) { + topN = job.getLong("CrawlDbReader.topN", 100) / job.getNumReduceTasks(); + } + + public void close() {} + } + + public void close() { + closeReaders(); + } + + public void processStatJob(String crawlDb, Configuration config) throws IOException { + + if (LOG.isInfoEnabled()) { + LOG.info("CrawlDb statistics start: " + crawlDb); + } + + Path tmpFolder = new Path(crawlDb, "stat_tmp" + System.currentTimeMillis()); + + JobConf job = new NutchJob(config); + job.setJobName("stats " + crawlDb); + + job.addInputPath(new Path(crawlDb, CrawlDatum.DB_DIR_NAME)); + job.setInputFormat(SequenceFileInputFormat.class); + job.setInputKeyClass(UTF8.class); + job.setInputValueClass(CrawlDatum.class); + + job.setMapperClass(CrawlDbStatMapper.class); + job.setCombinerClass(CrawlDbStatCombiner.class); + job.setReducerClass(CrawlDbStatReducer.class); + + job.setOutputPath(tmpFolder); + job.setOutputFormat(SequenceFileOutputFormat.class); + job.setOutputKeyClass(UTF8.class); + job.setOutputValueClass(LongWritable.class); + + JobClient.runJob(job); + + // reading the result + FileSystem fileSystem = FileSystem.get(config); + SequenceFile.Reader[] readers = SequenceFileOutputFormat.getReaders(config, tmpFolder); + + UTF8 key = new UTF8(); + LongWritable value = new LongWritable(); + + TreeMap stats = new TreeMap(); + for (int i = 0; i < readers.length; i++) { + SequenceFile.Reader reader = readers[i]; + while (reader.next(key, value)) { + String k = key.toString(); + LongWritable val = (LongWritable) stats.get(k); + if (val == null) { + val = new LongWritable(); + if (k.equals("scx")) val.set(Long.MIN_VALUE); + if (k.equals("scn")) val.set(Long.MAX_VALUE); + stats.put(k, val); + } + if (k.equals("scx")) { + if (val.get() < value.get()) val.set(value.get()); + } else if (k.equals("scn")) { + if (val.get() > value.get()) val.set(value.get()); + } else { + val.set(val.get() + value.get()); + } + } + } + + if (LOG.isInfoEnabled()) { + LOG.info("Statistics for CrawlDb: " + crawlDb); + LongWritable totalCnt = (LongWritable)stats.get("T"); + stats.remove("T"); + LOG.info("TOTAL urls:\t" + totalCnt.get()); + Iterator it = stats.keySet().iterator(); + while (it.hasNext()) { + String k = (String) it.next(); + LongWritable val = (LongWritable) stats.get(k); + if (k.equals("scn")) { + LOG.info("min score:\t" + (float) (val.get() / 1000.0f)); + } else if (k.equals("scx")) { + LOG.info("max score:\t" + (float) (val.get() / 1000.0f)); + } else if (k.equals("sct")) { + LOG.info("avg score:\t" + (float) ((float) (val.get() / (float)totalCnt.get()) / 1000.0f)); + } else if (k.startsWith("status")) { + int code = Integer.parseInt(k.substring(k.indexOf(' ') + 1)); + LOG.info(k + " (" + CrawlDatum.statNames[code] + "):\t" + val); + } else LOG.info(k + ":\t" + val); + } + } + // removing the tmp folder + fileSystem.delete(tmpFolder); + if (LOG.isInfoEnabled()) { LOG.info("CrawlDb statistics: done"); } + + } + + public CrawlDatum get(String crawlDb, String url, Configuration config) throws IOException { + UTF8 key = new UTF8(url); + CrawlDatum val = new CrawlDatum(); + openReaders(crawlDb, config); + CrawlDatum res = (CrawlDatum)MapFileOutputFormat.getEntry(readers, new HashPartitioner(), key, val); + return res; + } + + public void readUrl(String crawlDb, String url, Configuration config) throws IOException { + CrawlDatum res = get(crawlDb, url, config); + System.out.println("URL: " + url); + if (res != null) { + System.out.println(res); + } else { + System.out.println("not found"); + } + } + + public void processDumpJob(String crawlDb, String output, Configuration config) throws IOException { + + if (LOG.isInfoEnabled()) { + LOG.info("CrawlDb dump: starting"); + LOG.info("CrawlDb db: " + crawlDb); + } + + Path outFolder = new Path(output); + + JobConf job = new NutchJob(config); + job.setJobName("dump " + crawlDb); + + job.addInputPath(new Path(crawlDb, CrawlDatum.DB_DIR_NAME)); + job.setInputFormat(SequenceFileInputFormat.class); + job.setInputKeyClass(UTF8.class); + job.setInputValueClass(CrawlDatum.class); + + job.setOutputPath(outFolder); + job.setOutputFormat(TextOutputFormat.class); + job.setOutputKeyClass(UTF8.class); + job.setOutputValueClass(CrawlDatum.class); + + JobClient.runJob(job); + if (LOG.isInfoEnabled()) { LOG.info("CrawlDb dump: done"); } + } + + public void processTopNJob(String crawlDb, long topN, float min, String output, Configuration config) throws IOException { + + if (LOG.isInfoEnabled()) { + LOG.info("CrawlDb topN: starting (topN=" + topN + ", min=" + min + ")"); + LOG.info("CrawlDb db: " + crawlDb); + } + + Path outFolder = new Path(output); + Path tempDir = + new Path(config.get("mapred.temp.dir", ".") + + "/readdb-topN-temp-"+ + Integer.toString(new Random().nextInt(Integer.MAX_VALUE))); + + JobConf job = new NutchJob(config); + job.setJobName("topN prepare " + crawlDb); + job.addInputPath(new Path(crawlDb, CrawlDatum.DB_DIR_NAME)); + job.setInputFormat(SequenceFileInputFormat.class); + job.setInputKeyClass(UTF8.class); + job.setInputValueClass(CrawlDatum.class); + job.setMapperClass(CrawlDbTopNMapper.class); + job.setReducerClass(IdentityReducer.class); + + job.setOutputPath(tempDir); + job.setOutputFormat(SequenceFileOutputFormat.class); + job.setOutputKeyClass(FloatWritable.class); + job.setOutputValueClass(UTF8.class); + + // XXX hmmm, no setFloat() in the API ... :( + job.setLong("CrawlDbReader.topN.min", Math.round(1000000.0 * min)); + JobClient.runJob(job); + + if (LOG.isInfoEnabled()) { + LOG.info("CrawlDb topN: collecting topN scores."); + } + job = new NutchJob(config); + job.setJobName("topN collect " + crawlDb); + job.setLong("CrawlDbReader.topN", topN); + + job.addInputPath(tempDir); + job.setInputFormat(SequenceFileInputFormat.class); + job.setInputKeyClass(FloatWritable.class); + job.setInputValueClass(UTF8.class); + job.setMapperClass(IdentityMapper.class); + job.setReducerClass(CrawlDbTopNReducer.class); + + job.setOutputPath(outFolder); + job.setOutputFormat(TextOutputFormat.class); + job.setOutputKeyClass(FloatWritable.class); + job.setOutputValueClass(UTF8.class); + + // XXX *sigh* this apparently doesn't work ... :-(( + job.setNumReduceTasks(1); // create a single file. + + JobClient.runJob(job); + FileSystem fs = FileSystem.get(config); + fs.delete(tempDir); + if (LOG.isInfoEnabled()) { LOG.info("CrawlDb topN: done"); } + + } + + public static void main(String[] args) throws IOException { + CrawlDbReader dbr = new CrawlDbReader(); + + if (args.length < 1) { + System.err.println("Usage: CrawlDbReader <crawldb> (-stats | -dump <out_dir> | -topN <nnnn> <out_dir> [<min>] | -url <url>)"); + System.err.println("\t<crawldb>\tdirectory name where crawldb is located"); + System.err.println("\t-stats\tprint overall statistics to System.out"); + System.err.println("\t-dump <out_dir>\tdump the whole db to a text file in <out_dir>"); + System.err.println("\t-url <url>\tprint information on <url> to System.out"); + System.err.println("\t-topN <nnnn> <out_dir> [<min>]\tdump top <nnnn> urls sorted by score to <out_dir>"); + System.err.println("\t\t[<min>]\tskip records with scores below this value."); + System.err.println("\t\t\tThis can significantly improve performance."); + return; + } + String param = null; + String crawlDb = args[0]; + Configuration conf = NutchConfiguration.create(); + for (int i = 1; i < args.length; i++) { + if (args[i].equals("-stats")) { + dbr.processStatJob(crawlDb, conf); + } else if (args[i].equals("-dump")) { + param = args[++i]; + dbr.processDumpJob(crawlDb, param, conf); + } else if (args[i].equals("-url")) { + param = args[++i]; + dbr.readUrl(crawlDb, param, conf); + } else if (args[i].equals("-topN")) { + param = args[++i]; + long topN = Long.parseLong(param); + param = args[++i]; + float min = 0.0f; + if (i < args.length - 1) { + min = Float.parseFloat(args[++i]); + } + dbr.processTopNJob(crawlDb, topN, min, param, conf); + } else { + System.err.println("\nError: wrong argument " + args[i]); + } + } + return; + } +} Propchange: lucene/nutch/branches/branch-0.8/src/java/org/apache/nutch/crawl/CrawlDbReader.java ------------------------------------------------------------------------------ svn:eol-style = native