Author: jnioche Date: Fri May 16 07:59:05 2014 New Revision: 1595137 URL: http://svn.apache.org/r1595137 Log: NUTCH-1772 Injector does not need merging if no pre-existing crawldb
Modified: nutch/trunk/CHANGES.txt nutch/trunk/src/java/org/apache/nutch/crawl/Injector.java Modified: nutch/trunk/CHANGES.txt URL: http://svn.apache.org/viewvc/nutch/trunk/CHANGES.txt?rev=1595137&r1=1595136&r2=1595137&view=diff ============================================================================== --- nutch/trunk/CHANGES.txt (original) +++ nutch/trunk/CHANGES.txt Fri May 16 07:59:05 2014 @@ -2,6 +2,8 @@ Nutch Change Log Nutch Current Development +* NUTCH-1772 Injector does not need merging if no pre-existing crawldb (jnioche) + * NUTCH-1752 Cache robots.txt rules per protocol:host:port (snagel) * NUTCH-1613 Timeouts in protocol-httpclient when crawling same host with >2 threads (brian44 via jnioche) Modified: nutch/trunk/src/java/org/apache/nutch/crawl/Injector.java URL: http://svn.apache.org/viewvc/nutch/trunk/src/java/org/apache/nutch/crawl/Injector.java?rev=1595137&r1=1595136&r2=1595137&view=diff ============================================================================== --- nutch/trunk/src/java/org/apache/nutch/crawl/Injector.java (original) +++ nutch/trunk/src/java/org/apache/nutch/crawl/Injector.java Fri May 16 07:59:05 2014 @@ -39,28 +39,38 @@ import org.apache.nutch.util.NutchConfig import org.apache.nutch.util.NutchJob; import org.apache.nutch.util.TimingUtil; -/** This class takes a flat file of URLs and adds them to the of pages to be - * crawled. Useful for bootstrapping the system. - * The URL files contain one URL per line, optionally followed by custom metadata - * separated by tabs with the metadata key separated from the corresponding value by '='. <br> +/** + * This class takes a flat file of URLs and adds them to the of pages to be + * crawled. Useful for bootstrapping the system. The URL files contain one URL + * per line, optionally followed by custom metadata separated by tabs with the + * metadata key separated from the corresponding value by '='. <br> * Note that some metadata keys are reserved : <br> * - <i>nutch.score</i> : allows to set a custom score for a specific URL <br> - * - <i>nutch.fetchInterval</i> : allows to set a custom fetch interval for a specific URL <br> - * - <i>nutch.fetchInterval.fixed</i> : allows to set a custom fetch interval for a specific URL that is not changed by AdaptiveFetchSchedule <br> - * e.g. http://www.nutch.org/ \t nutch.score=10 \t nutch.fetchInterval=2592000 \t userType=open_source + * - <i>nutch.fetchInterval</i> : allows to set a custom fetch interval for a + * specific URL <br> + * - <i>nutch.fetchInterval.fixed</i> : allows to set a custom fetch interval + * for a specific URL that is not changed by AdaptiveFetchSchedule <br> + * e.g. http://www.nutch.org/ \t nutch.score=10 \t nutch.fetchInterval=2592000 + * \t userType=open_source **/ public class Injector extends Configured implements Tool { public static final Logger LOG = LoggerFactory.getLogger(Injector.class); - + /** metadata key reserved for setting a custom score for a specific URL */ public static String nutchScoreMDName = "nutch.score"; - /** metadata key reserved for setting a custom fetchInterval for a specific URL */ + /** + * metadata key reserved for setting a custom fetchInterval for a specific URL + */ public static String nutchFetchIntervalMDName = "nutch.fetchInterval"; - /** metadata key reserved for setting a fixed custom fetchInterval for a specific URL */ + /** + * metadata key reserved for setting a fixed custom fetchInterval for a + * specific URL + */ public static String nutchFixedFetchIntervalMDName = "nutch.fetchInterval.fixed"; /** Normalize and filter injected urls. */ - public static class InjectMapper implements Mapper<WritableComparable<?>, Text, Text, CrawlDatum> { + public static class InjectMapper implements + Mapper<WritableComparable<?>, Text, Text, CrawlDatum> { private URLNormalizers urlNormalizers; private int interval; private float scoreInjected; @@ -76,19 +86,21 @@ public class Injector extends Configured filters = new URLFilters(jobConf); scfilters = new ScoringFilters(jobConf); scoreInjected = jobConf.getFloat("db.score.injected", 1.0f); - curTime = job.getLong("injector.current.time", System.currentTimeMillis()); + curTime = job + .getLong("injector.current.time", System.currentTimeMillis()); } - public void close() {} + public void close() { + } public void map(WritableComparable<?> key, Text value, - OutputCollector<Text, CrawlDatum> output, Reporter reporter) - throws IOException { - String url = value.toString().trim(); // value is line of text - - if (url != null && ( url.length() == 0 || url.startsWith("#") ) ) { - /* Ignore line that start with # */ - return; + OutputCollector<Text, CrawlDatum> output, Reporter reporter) + throws IOException { + String url = value.toString().trim(); // value is line of text + + if (url != null && (url.length() == 0 || url.startsWith("#"))) { + /* Ignore line that start with # */ + return; } // if tabs : metadata that could be stored @@ -96,55 +108,60 @@ public class Injector extends Configured float customScore = -1f; int customInterval = interval; int fixedInterval = -1; - Map<String,String> metadata = new TreeMap<String,String>(); - if (url.indexOf("\t")!=-1){ - String[] splits = url.split("\t"); - url = splits[0]; - for (int s=1;s<splits.length;s++){ - // find separation between name and value - int indexEquals = splits[s].indexOf("="); - if (indexEquals==-1) { - // skip anything without a = - continue; - } - String metaname = splits[s].substring(0, indexEquals); - String metavalue = splits[s].substring(indexEquals+1); - if (metaname.equals(nutchScoreMDName)) { - try { - customScore = Float.parseFloat(metavalue);} - catch (NumberFormatException nfe){} - } - else if (metaname.equals(nutchFetchIntervalMDName)) { - try { - customInterval = Integer.parseInt(metavalue);} - catch (NumberFormatException nfe){} - } - else if (metaname.equals(nutchFixedFetchIntervalMDName)) { - try { - fixedInterval = Integer.parseInt(metavalue);} - catch (NumberFormatException nfe){} - } - else metadata.put(metaname,metavalue); - } + Map<String, String> metadata = new TreeMap<String, String>(); + if (url.indexOf("\t") != -1) { + String[] splits = url.split("\t"); + url = splits[0]; + for (int s = 1; s < splits.length; s++) { + // find separation between name and value + int indexEquals = splits[s].indexOf("="); + if (indexEquals == -1) { + // skip anything without a = + continue; + } + String metaname = splits[s].substring(0, indexEquals); + String metavalue = splits[s].substring(indexEquals + 1); + if (metaname.equals(nutchScoreMDName)) { + try { + customScore = Float.parseFloat(metavalue); + } catch (NumberFormatException nfe) { + } + } else if (metaname.equals(nutchFetchIntervalMDName)) { + try { + customInterval = Integer.parseInt(metavalue); + } catch (NumberFormatException nfe) { + } + } else if (metaname.equals(nutchFixedFetchIntervalMDName)) { + try { + fixedInterval = Integer.parseInt(metavalue); + } catch (NumberFormatException nfe) { + } + } else + metadata.put(metaname, metavalue); + } } try { url = urlNormalizers.normalize(url, URLNormalizers.SCOPE_INJECT); - url = filters.filter(url); // filter the url + url = filters.filter(url); // filter the url } catch (Exception e) { - if (LOG.isWarnEnabled()) { LOG.warn("Skipping " +url+":"+e); } + if (LOG.isWarnEnabled()) { + LOG.warn("Skipping " + url + ":" + e); + } url = null; } if (url == null) { reporter.getCounter("injector", "urls_filtered").increment(1); - } else { // if it passes - value.set(url); // collect it + } else { // if it passes + value.set(url); // collect it CrawlDatum datum = new CrawlDatum(); datum.setStatus(CrawlDatum.STATUS_INJECTED); // Is interval custom? Then set as meta data if (fixedInterval > -1) { - // Set writable using float. Flaot is used by AdaptiveFetchSchedule - datum.getMetaData().put(Nutch.WRITABLE_FIXED_INTERVAL_KEY, new FloatWritable(fixedInterval)); + // Set writable using float. Flaot is used by + // AdaptiveFetchSchedule + datum.getMetaData().put(Nutch.WRITABLE_FIXED_INTERVAL_KEY, + new FloatWritable(fixedInterval)); datum.setFetchInterval(fixedInterval); } else { datum.setFetchInterval(customInterval); @@ -153,20 +170,22 @@ public class Injector extends Configured datum.setFetchTime(curTime); // now add the metadata Iterator<String> keysIter = metadata.keySet().iterator(); - while (keysIter.hasNext()){ - String keymd = keysIter.next(); - String valuemd = metadata.get(keymd); - datum.getMetaData().put(new Text(keymd), new Text(valuemd)); + while (keysIter.hasNext()) { + String keymd = keysIter.next(); + String valuemd = metadata.get(keymd); + datum.getMetaData().put(new Text(keymd), new Text(valuemd)); } - if (customScore != -1) datum.setScore(customScore); - else datum.setScore(scoreInjected); + if (customScore != -1) + datum.setScore(customScore); + else + datum.setScore(scoreInjected); try { - scfilters.injectedScore(value, datum); + scfilters.injectedScore(value, datum); } catch (ScoringFilterException e) { - if (LOG.isWarnEnabled()) { - LOG.warn("Cannot filter injected score for url " + url - + ", using default (" + e.getMessage() + ")"); - } + if (LOG.isWarnEnabled()) { + LOG.warn("Cannot filter injected score for url " + url + + ", using default (" + e.getMessage() + ")"); + } } reporter.getCounter("injector", "urls_injected").increment(1); output.collect(value, datum); @@ -175,7 +194,8 @@ public class Injector extends Configured } /** Combine multiple new entries for a url. */ - public static class InjectReducer implements Reducer<Text, CrawlDatum, Text, CrawlDatum> { + public static class InjectReducer implements + Reducer<Text, CrawlDatum, Text, CrawlDatum> { private int interval; private float scoreInjected; private boolean overwrite = false; @@ -189,15 +209,16 @@ public class Injector extends Configured LOG.info("Injector: overwrite: " + overwrite); LOG.info("Injector: update: " + update); } - - public void close() {} + + public void close() { + } private CrawlDatum old = new CrawlDatum(); private CrawlDatum injected = new CrawlDatum(); - + public void reduce(Text key, Iterator<CrawlDatum> values, - OutputCollector<Text, CrawlDatum> output, Reporter reporter) - throws IOException { + OutputCollector<Text, CrawlDatum> output, Reporter reporter) + throws IOException { boolean oldSet = false; boolean injectedSet = false; while (values.hasNext()) { @@ -210,29 +231,35 @@ public class Injector extends Configured old.set(val); oldSet = true; } + } CrawlDatum res = null; - + // Old default behaviour if (injectedSet && !oldSet) { res = injected; } else { res = old; } - + if (injectedSet && oldSet) { + reporter.getCounter("injector", "urls_merged").increment(1); + } /** * Whether to overwrite, ignore or update existing records + * * @see https://issues.apache.org/jira/browse/NUTCH-1405 */ // Injected record already exists and update but not overwrite if (injectedSet && oldSet && update && !overwrite) { res = old; old.putAllMetaData(injected); - old.setScore(injected.getScore() != scoreInjected ? injected.getScore() : old.getScore()); - old.setFetchInterval(injected.getFetchInterval() != interval ? injected.getFetchInterval() : old.getFetchInterval()); + old.setScore(injected.getScore() != scoreInjected ? injected.getScore() + : old.getScore()); + old.setFetchInterval(injected.getFetchInterval() != interval ? injected + .getFetchInterval() : old.getFetchInterval()); } - + // Injected record already exists and overwrite if (injectedSet && oldSet && overwrite) { res = injected; @@ -242,12 +269,13 @@ public class Injector extends Configured } } - public Injector() {} - + public Injector() { + } + public Injector(Configuration conf) { setConf(conf); } - + public void inject(Path crawlDb, Path urlDir) throws IOException { SimpleDateFormat sdf = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss"); long start = System.currentTimeMillis(); @@ -257,56 +285,92 @@ public class Injector extends Configured LOG.info("Injector: urlDir: " + urlDir); } - Path tempDir = - new Path(getConf().get("mapred.temp.dir", ".") + - "/inject-temp-"+ - Integer.toString(new Random().nextInt(Integer.MAX_VALUE))); + Path tempDir = new Path(getConf().get("mapred.temp.dir", ".") + + "/inject-temp-" + + Integer.toString(new Random().nextInt(Integer.MAX_VALUE))); // map text input file to a <url,CrawlDatum> file if (LOG.isInfoEnabled()) { LOG.info("Injector: Converting injected urls to crawl db entries."); } + + FileSystem fs = FileSystem.get(getConf()); + // determine if the crawldb already exists + boolean dbExists = fs.exists(crawlDb); + JobConf sortJob = new NutchJob(getConf()); sortJob.setJobName("inject " + urlDir); FileInputFormat.addInputPath(sortJob, urlDir); sortJob.setMapperClass(InjectMapper.class); FileOutputFormat.setOutputPath(sortJob, tempDir); - sortJob.setOutputFormat(SequenceFileOutputFormat.class); + if (dbExists) { + // Don't run merge injected urls, wait for merge with + // existing DB + sortJob.setOutputFormat(SequenceFileOutputFormat.class); + sortJob.setNumReduceTasks(0); + } else { + sortJob.setOutputFormat(MapFileOutputFormat.class); + sortJob.setReducerClass(InjectReducer.class); + sortJob.setBoolean("mapreduce.fileoutputcommitter.marksuccessfuljobs", + false); + } sortJob.setOutputKeyClass(Text.class); sortJob.setOutputValueClass(CrawlDatum.class); sortJob.setLong("injector.current.time", System.currentTimeMillis()); - RunningJob mapJob = JobClient.runJob(sortJob); - long urlsInjected = mapJob.getCounters().findCounter("injector", "urls_injected").getValue(); - long urlsFiltered = mapJob.getCounters().findCounter("injector", "urls_filtered").getValue(); - LOG.info("Injector: total number of urls rejected by filters: " + urlsFiltered); - LOG.info("Injector: total number of urls injected after normalization and filtering: " + RunningJob mapJob = null; + try { + mapJob = JobClient.runJob(sortJob); + } catch (IOException e) { + fs.delete(tempDir, true); + throw e; + } + long urlsInjected = mapJob.getCounters() + .findCounter("injector", "urls_injected").getValue(); + long urlsFiltered = mapJob.getCounters() + .findCounter("injector", "urls_filtered").getValue(); + LOG.info("Injector: Total number of urls rejected by filters: " + + urlsFiltered); + LOG.info("Injector: Total number of urls after normalization: " + urlsInjected); - - // merge with existing crawl db - if (LOG.isInfoEnabled()) { - LOG.info("Injector: Merging injected urls into crawl db."); + long urlsMerged = 0; + if (dbExists) { + // merge with existing crawl db + if (LOG.isInfoEnabled()) { + LOG.info("Injector: Merging injected urls into crawl db."); + } + JobConf mergeJob = CrawlDb.createJob(getConf(), crawlDb); + FileInputFormat.addInputPath(mergeJob, tempDir); + mergeJob.setReducerClass(InjectReducer.class); + try { + RunningJob merge = JobClient.runJob(mergeJob); + urlsMerged = merge.getCounters().findCounter("injector", "urls_merged") + .getValue(); + LOG.info("Injector: URLs merged: " + urlsMerged); + } catch (IOException e) { + fs.delete(tempDir, true); + throw e; + } + CrawlDb.install(mergeJob, crawlDb); + } else { + CrawlDb.install(sortJob, crawlDb); } - JobConf mergeJob = CrawlDb.createJob(getConf(), crawlDb); - FileInputFormat.addInputPath(mergeJob, tempDir); - mergeJob.setReducerClass(InjectReducer.class); - JobClient.runJob(mergeJob); - CrawlDb.install(mergeJob, crawlDb); // clean up - FileSystem fs = FileSystem.get(getConf()); fs.delete(tempDir, true); - + LOG.info("Injector: Total new urls injected: " + + (urlsInjected - urlsMerged)); long end = System.currentTimeMillis(); - LOG.info("Injector: finished at " + sdf.format(end) + ", elapsed: " + TimingUtil.elapsedTime(start, end)); + LOG.info("Injector: finished at " + sdf.format(end) + ", elapsed: " + + TimingUtil.elapsedTime(start, end)); } public static void main(String[] args) throws Exception { int res = ToolRunner.run(NutchConfiguration.create(), new Injector(), args); System.exit(res); } - + public int run(String[] args) throws Exception { if (args.length < 2) { System.err.println("Usage: Injector <crawldb> <url_dir>");