Author: cutting Date: Thu Aug 18 12:12:31 2005 New Revision: 233359 URL: http://svn.apache.org/viewcvs?rev=233359&view=rev Log: Add option to parse while fetching; add option to not store content while fetching.
Added: lucene/nutch/branches/mapred/src/java/org/apache/nutch/crawl/ParseOutputFormat.java Modified: lucene/nutch/branches/mapred/conf/nutch-default.xml lucene/nutch/branches/mapred/src/java/org/apache/nutch/crawl/Crawl.java lucene/nutch/branches/mapred/src/java/org/apache/nutch/crawl/Fetcher.java lucene/nutch/branches/mapred/src/java/org/apache/nutch/crawl/FetcherOutput.java lucene/nutch/branches/mapred/src/java/org/apache/nutch/crawl/FetcherOutputFormat.java lucene/nutch/branches/mapred/src/java/org/apache/nutch/crawl/Indexer.java lucene/nutch/branches/mapred/src/java/org/apache/nutch/crawl/ParseSegment.java lucene/nutch/branches/mapred/src/java/org/apache/nutch/mapred/MapTask.java lucene/nutch/branches/mapred/src/java/org/apache/nutch/mapred/ReduceTask.java lucene/nutch/branches/mapred/src/java/org/apache/nutch/parse/ParseImpl.java Modified: lucene/nutch/branches/mapred/conf/nutch-default.xml URL: http://svn.apache.org/viewcvs/lucene/nutch/branches/mapred/conf/nutch-default.xml?rev=233359&r1=233358&r2=233359&view=diff ============================================================================== --- lucene/nutch/branches/mapred/conf/nutch-default.xml (original) +++ lucene/nutch/branches/mapred/conf/nutch-default.xml Thu Aug 18 12:12:31 2005 @@ -312,6 +312,18 @@ <description>If true, fetcher will log more verbosely.</description> </property> +<property> + <name>fetcher.parse</name> + <value>true</value> + <description>If true, fetcher will parse content.</description> +</property> + +<property> + <name>fetcher.store.content</name> + <value>true</value> + <description>If true, fetcher will store content.</description> +</property> + <!-- i/o properties --> <property> @@ -391,7 +403,7 @@ <property> <name>mapred.map.tasks</name> - <value>1</value> + <value>2</value> <description>The default number of map tasks per job. Typically set to a prime several times greater than number of available hosts. Ignored when mapred.job.tracker is "local". Modified: lucene/nutch/branches/mapred/src/java/org/apache/nutch/crawl/Crawl.java URL: http://svn.apache.org/viewcvs/lucene/nutch/branches/mapred/src/java/org/apache/nutch/crawl/Crawl.java?rev=233359&r1=233358&r2=233359&view=diff ============================================================================== --- lucene/nutch/branches/mapred/src/java/org/apache/nutch/crawl/Crawl.java (original) +++ lucene/nutch/branches/mapred/src/java/org/apache/nutch/crawl/Crawl.java Thu Aug 18 12:12:31 2005 @@ -101,7 +101,9 @@ new Generator(conf).generate(crawlDb, segments, -1, topN, System.currentTimeMillis()); new Fetcher(conf).fetch(segment, threads); // fetch it - new ParseSegment(conf).parse(segment); // parse it + if (!Fetcher.isParsing(conf)) { + new ParseSegment(conf).parse(segment); // parse it, if needed + } new CrawlDb(conf).update(crawlDb, segment); // update crawldb } Modified: lucene/nutch/branches/mapred/src/java/org/apache/nutch/crawl/Fetcher.java URL: http://svn.apache.org/viewcvs/lucene/nutch/branches/mapred/src/java/org/apache/nutch/crawl/Fetcher.java?rev=233359&r1=233358&r2=233359&view=diff ============================================================================== --- lucene/nutch/branches/mapred/src/java/org/apache/nutch/crawl/Fetcher.java (original) +++ lucene/nutch/branches/mapred/src/java/org/apache/nutch/crawl/Fetcher.java Thu Aug 18 12:12:31 2005 @@ -37,6 +37,7 @@ LogFormatter.getLogger("org.apache.nutch.fetcher.Fetcher"); public static final String DIGEST_KEY = "nutch.content.digest"; + public static final String SEGMENT_NAME_KEY = "nutch.segment.name"; public static class InputFormat extends SequenceFileInputFormat { /** Don't split inputs, to keep things polite. */ @@ -55,6 +56,7 @@ private OutputCollector output; private Reporter reporter; + private String segmentName; private int activeThreads; private int maxRedirect; @@ -64,6 +66,9 @@ private int pages; // total pages fetched private int errors; // total pages errored + private boolean storingContent; + private boolean parsing; + private class FetcherThread extends Thread { public void run() { synchronized (Fetcher.this) {activeThreads++;} // count threads @@ -180,9 +185,32 @@ content.getMetadata().setProperty // add digest to metadata (DIGEST_KEY, MD5Hash.digest(content.getContent()).toString()); + content.getMetadata().setProperty // add segment to metadata + (SEGMENT_NAME_KEY, segmentName); + + Parse parse = null; + if (parsing) { + ParseStatus parseStatus; + try { + Parser parser = ParserFactory.getParser(content.getContentType(), + content.getBaseUrl()); + parse = parser.getParse(content); + parseStatus = parse.getData().getStatus(); + } catch (Exception e) { + parseStatus = new ParseStatus(e); + } + if (!parseStatus.isSuccess()) { + LOG.warning("Error parsing: "+key+": "+parseStatus); + parse = null; + } + } try { - output.collect(key, new FetcherOutput(datum, content)); + output.collect + (key, + new FetcherOutput(datum, + storingContent ? content : null, + parse != null ? new ParseImpl(parse) : null)); } catch (IOException e) { LOG.severe("fetcher caught:"+e.toString()); } @@ -214,11 +242,24 @@ public void configure(JobConf job) { setConf(job); + + this.segmentName = job.get(SEGMENT_NAME_KEY); + this.storingContent = isStoringContent(job); + this.parsing = isParsing(job); + if (job.getBoolean("fetcher.verbose", false)) { LOG.setLevel(Level.FINE); } } + public static boolean isParsing(NutchConf conf) { + return conf.getBoolean("fetcher.parse", true); + } + + public static boolean isStoringContent(NutchConf conf) { + return conf.getBoolean("fetcher.store.content", true); + } + public void run(RecordReader input, OutputCollector output, Reporter reporter) throws IOException { @@ -253,6 +294,7 @@ JobConf job = new JobConf(getConf()); job.setInt("fetcher.threads.fetch", threads); + job.set(SEGMENT_NAME_KEY, segment.getName()); job.setInputDir(new File(segment, CrawlDatum.GENERATE_DIR_NAME)); job.setInputFormat(InputFormat.class); Modified: lucene/nutch/branches/mapred/src/java/org/apache/nutch/crawl/FetcherOutput.java URL: http://svn.apache.org/viewcvs/lucene/nutch/branches/mapred/src/java/org/apache/nutch/crawl/FetcherOutput.java?rev=233359&r1=233358&r2=233359&view=diff ============================================================================== --- lucene/nutch/branches/mapred/src/java/org/apache/nutch/crawl/FetcherOutput.java (original) +++ lucene/nutch/branches/mapred/src/java/org/apache/nutch/crawl/FetcherOutput.java Thu Aug 18 12:12:31 2005 @@ -22,31 +22,46 @@ import org.apache.nutch.fs.*; import org.apache.nutch.util.*; import org.apache.nutch.protocol.Content; +import org.apache.nutch.parse.*; /* An entry in the fetcher's output. */ public final class FetcherOutput implements Writable { private CrawlDatum crawlDatum; private Content content; + private ParseImpl parse; public FetcherOutput() {} - public FetcherOutput(CrawlDatum crawlDatum, Content content) { + public FetcherOutput(CrawlDatum crawlDatum, Content content, + ParseImpl parse) { this.crawlDatum = crawlDatum; this.content = content; + this.parse = parse; } public final void readFields(DataInput in) throws IOException { this.crawlDatum = CrawlDatum.read(in); - this.content = Content.read(in); + this.content = in.readBoolean() ? Content.read(in) : null; + this.parse = in.readBoolean() ? ParseImpl.read(in) : null; } public final void write(DataOutput out) throws IOException { crawlDatum.write(out); - content.write(out); + + out.writeBoolean(content != null); + if (content != null) { + content.write(out); + } + + out.writeBoolean(parse != null); + if (parse != null) { + parse.write(out); + } } public CrawlDatum getCrawlDatum() { return crawlDatum; } public Content getContent() { return content; } + public ParseImpl getParse() { return parse; } public boolean equals(Object o) { if (!(o instanceof FetcherOutput)) Modified: lucene/nutch/branches/mapred/src/java/org/apache/nutch/crawl/FetcherOutputFormat.java URL: http://svn.apache.org/viewcvs/lucene/nutch/branches/mapred/src/java/org/apache/nutch/crawl/FetcherOutputFormat.java?rev=233359&r1=233358&r2=233359&view=diff ============================================================================== --- lucene/nutch/branches/mapred/src/java/org/apache/nutch/crawl/FetcherOutputFormat.java (original) +++ lucene/nutch/branches/mapred/src/java/org/apache/nutch/crawl/FetcherOutputFormat.java Thu Aug 18 12:12:31 2005 @@ -35,21 +35,32 @@ /** Splits FetcherOutput entries into multiple map files. */ public class FetcherOutputFormat implements OutputFormat { - public RecordWriter getRecordWriter(NutchFileSystem fs, JobConf job, - String name) throws IOException { + public RecordWriter getRecordWriter(final NutchFileSystem fs, + final JobConf job, + final String name) throws IOException { - File fetch = + final File fetch = new File(new File(job.getOutputDir(), CrawlDatum.FETCH_DIR_NAME), name); - File content = + final File content = new File(new File(job.getOutputDir(), Content.DIR_NAME), name); final MapFile.Writer fetchOut = new MapFile.Writer(fs, fetch.toString(), UTF8.class, CrawlDatum.class); - final MapFile.Writer contentOut = - new MapFile.Writer(fs, content.toString(), UTF8.class, Content.class); - return new RecordWriter() { + private MapFile.Writer contentOut; + private RecordWriter parseOut; + + { + if (Fetcher.isStoringContent(job)) { + contentOut = new MapFile.Writer(fs, content.toString(), + UTF8.class, Content.class); + } + + if (Fetcher.isParsing(job)) { + parseOut = new ParseOutputFormat().getRecordWriter(fs, job, name); + } + } public void write(WritableComparable key, Writable value) throws IOException { @@ -57,12 +68,25 @@ FetcherOutput fo = (FetcherOutput)value; fetchOut.append(key, fo.getCrawlDatum()); - contentOut.append(key, fo.getContent()); + + if (fo.getContent() != null) { + contentOut.append(key, fo.getContent()); + } + + if (fo.getParse() != null) { + parseOut.write(key, fo.getParse()); + } + } public void close() throws IOException { fetchOut.close(); - contentOut.close(); + if (contentOut != null) { + contentOut.close(); + } + if (parseOut != null) { + parseOut.close(); + } } }; Modified: lucene/nutch/branches/mapred/src/java/org/apache/nutch/crawl/Indexer.java URL: http://svn.apache.org/viewcvs/lucene/nutch/branches/mapred/src/java/org/apache/nutch/crawl/Indexer.java?rev=233359&r1=233358&r2=233359&view=diff ============================================================================== --- lucene/nutch/branches/mapred/src/java/org/apache/nutch/crawl/Indexer.java (original) +++ lucene/nutch/branches/mapred/src/java/org/apache/nutch/crawl/Indexer.java Thu Aug 18 12:12:31 2005 @@ -155,7 +155,7 @@ // add segment, used to map from merged index back to segment files doc.add(Field.UnIndexed("segment", - meta.getProperty(ParseSegment.SEGMENT_NAME_KEY))); + meta.getProperty(Fetcher.SEGMENT_NAME_KEY))); // add digest, used by dedup doc.add(Field.UnIndexed("digest", meta.getProperty(Fetcher.DIGEST_KEY))); Added: lucene/nutch/branches/mapred/src/java/org/apache/nutch/crawl/ParseOutputFormat.java URL: http://svn.apache.org/viewcvs/lucene/nutch/branches/mapred/src/java/org/apache/nutch/crawl/ParseOutputFormat.java?rev=233359&view=auto ============================================================================== --- lucene/nutch/branches/mapred/src/java/org/apache/nutch/crawl/ParseOutputFormat.java (added) +++ lucene/nutch/branches/mapred/src/java/org/apache/nutch/crawl/ParseOutputFormat.java Thu Aug 18 12:12:31 2005 @@ -0,0 +1,92 @@ +/** + * Copyright 2005 The Apache Software Foundation + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.nutch.crawl; + +import org.apache.nutch.io.*; +import org.apache.nutch.fs.*; +import org.apache.nutch.mapred.*; +import org.apache.nutch.parse.*; +import org.apache.nutch.net.*; + +import java.io.*; +import java.util.*; + +/* Parse content in a segment. */ +public class ParseOutputFormat implements OutputFormat { + + private UrlNormalizer urlNormalizer = UrlNormalizerFactory.getNormalizer(); + + public RecordWriter getRecordWriter(NutchFileSystem fs, JobConf job, + String name) throws IOException { + + final float interval = job.getFloat("db.default.fetch.interval", 30f); + + File text = + new File(new File(job.getOutputDir(), ParseText.DIR_NAME), name); + File data = + new File(new File(job.getOutputDir(), ParseData.DIR_NAME), name); + File crawl = + new File(new File(job.getOutputDir(), CrawlDatum.PARSE_DIR_NAME), name); + + final MapFile.Writer textOut = + new MapFile.Writer(fs, text.toString(), UTF8.class, ParseText.class); + + final MapFile.Writer dataOut = + new MapFile.Writer(fs, data.toString(), UTF8.class,ParseData.class,true); + + final SequenceFile.Writer crawlOut = + new SequenceFile.Writer(fs, crawl.toString(), + UTF8.class, CrawlDatum.class); + + return new RecordWriter() { + + public void write(WritableComparable key, Writable value) + throws IOException { + + Parse parse = (Parse)value; + + textOut.append(key, new ParseText(parse.getText())); + dataOut.append(key, parse.getData()); + + // collect outlinks for subsequent db update + Outlink[] links = parse.getData().getOutlinks(); + for (int i = 0; i < links.length; i++) { + String toUrl = links[i].getToUrl(); + try { + toUrl = urlNormalizer.normalize(toUrl); // normalize the url + toUrl = URLFilters.filter(toUrl); // filter the url + } catch (Exception e) { + toUrl = null; + } + if (toUrl != null) + crawlOut.append(new UTF8(toUrl), + new CrawlDatum(CrawlDatum.STATUS_LINKED, + interval)); + } + } + + public void close() throws IOException { + textOut.close(); + dataOut.close(); + crawlOut.close(); + } + + }; + + } + +} Modified: lucene/nutch/branches/mapred/src/java/org/apache/nutch/crawl/ParseSegment.java URL: http://svn.apache.org/viewcvs/lucene/nutch/branches/mapred/src/java/org/apache/nutch/crawl/ParseSegment.java?rev=233359&r1=233358&r2=233359&view=diff ============================================================================== --- lucene/nutch/branches/mapred/src/java/org/apache/nutch/crawl/ParseSegment.java (original) +++ lucene/nutch/branches/mapred/src/java/org/apache/nutch/crawl/ParseSegment.java Thu Aug 18 12:12:31 2005 @@ -29,19 +29,11 @@ import java.util.logging.*; /* Parse content in a segment. */ -public class ParseSegment - extends NutchConfigured implements Mapper, Reducer, OutputFormat { +public class ParseSegment extends NutchConfigured implements Mapper, Reducer { public static final Logger LOG = LogFormatter.getLogger(Parser.class.getName()); - public static final String SEGMENT_NAME_KEY = "nutch.segment.name"; - - private float interval; - private String segmentName; - - private UrlNormalizer urlNormalizer = UrlNormalizerFactory.getNormalizer(); - public ParseSegment() { super(null); } public ParseSegment(NutchConf conf) { @@ -49,8 +41,6 @@ } public void configure(JobConf job) { - interval = job.getFloat("db.default.fetch.interval", 30f); - segmentName = job.get(SEGMENT_NAME_KEY); } public void map(WritableComparable key, Writable value, @@ -70,7 +60,6 @@ } if (status.isSuccess()) { - parse.getData().getMetadata().setProperty(SEGMENT_NAME_KEY, segmentName); output.collect(key, new ParseImpl(parse.getText(), parse.getData())); } else { LOG.warning("Error parsing: "+key+": "+status.toString()); @@ -83,70 +72,12 @@ output.collect(key, (Writable)values.next()); // collect first value } - public RecordWriter getRecordWriter(NutchFileSystem fs, JobConf job, - String name) throws IOException { - File text = - new File(new File(job.getOutputDir(), ParseText.DIR_NAME), name); - File data = - new File(new File(job.getOutputDir(), ParseData.DIR_NAME), name); - File crawl = - new File(new File(job.getOutputDir(), CrawlDatum.PARSE_DIR_NAME), name); - - final MapFile.Writer textOut = - new MapFile.Writer(fs, text.toString(), UTF8.class, ParseText.class); - - final MapFile.Writer dataOut = - new MapFile.Writer(fs, data.toString(), UTF8.class,ParseData.class,true); - - final SequenceFile.Writer crawlOut = - new SequenceFile.Writer(fs, crawl.toString(), - UTF8.class, CrawlDatum.class); - - return new RecordWriter() { - - public void write(WritableComparable key, Writable value) - throws IOException { - - Parse parse = (Parse)value; - - textOut.append(key, new ParseText(parse.getText())); - dataOut.append(key, parse.getData()); - - // collect outlinks for subsequent db update - Outlink[] links = parse.getData().getOutlinks(); - for (int i = 0; i < links.length; i++) { - String toUrl = links[i].getToUrl(); - try { - toUrl = urlNormalizer.normalize(toUrl); // normalize the url - toUrl = URLFilters.filter(toUrl); // filter the url - } catch (Exception e) { - toUrl = null; - } - if (toUrl != null) - crawlOut.append(new UTF8(toUrl), - new CrawlDatum(CrawlDatum.STATUS_LINKED, - interval)); - } - } - - public void close() throws IOException { - textOut.close(); - dataOut.close(); - crawlOut.close(); - } - - }; - - } - public void parse(File segment) throws IOException { LOG.info("Parse: starting"); LOG.info("Parse: segment: " + segment); JobConf job = new JobConf(getConf()); - job.set(SEGMENT_NAME_KEY, segment.getName()); - job.setInputDir(new File(segment, Content.DIR_NAME)); job.setInputFormat(SequenceFileInputFormat.class); job.setInputKeyClass(UTF8.class); @@ -155,7 +86,7 @@ job.setReducerClass(ParseSegment.class); job.setOutputDir(segment); - job.setOutputFormat(ParseSegment.class); + job.setOutputFormat(ParseOutputFormat.class); job.setOutputKeyClass(UTF8.class); job.setOutputValueClass(ParseImpl.class); Modified: lucene/nutch/branches/mapred/src/java/org/apache/nutch/mapred/MapTask.java URL: http://svn.apache.org/viewcvs/lucene/nutch/branches/mapred/src/java/org/apache/nutch/mapred/MapTask.java?rev=233359&r1=233358&r2=233359&view=diff ============================================================================== --- lucene/nutch/branches/mapred/src/java/org/apache/nutch/mapred/MapTask.java (original) +++ lucene/nutch/branches/mapred/src/java/org/apache/nutch/mapred/MapTask.java Thu Aug 18 12:12:31 2005 @@ -77,6 +77,7 @@ throws IOException { outs[partitioner.getPartition(key, value, partitions)] .append(key, value); + reportProgress(umbilical); } }; Modified: lucene/nutch/branches/mapred/src/java/org/apache/nutch/mapred/ReduceTask.java URL: http://svn.apache.org/viewcvs/lucene/nutch/branches/mapred/src/java/org/apache/nutch/mapred/ReduceTask.java?rev=233359&r1=233358&r2=233359&view=diff ============================================================================== --- lucene/nutch/branches/mapred/src/java/org/apache/nutch/mapred/ReduceTask.java (original) +++ lucene/nutch/branches/mapred/src/java/org/apache/nutch/mapred/ReduceTask.java Thu Aug 18 12:12:31 2005 @@ -246,6 +246,7 @@ public void collect(WritableComparable key, Writable value) throws IOException { out.write(key, value); + reportProgress(umbilical); } }; Modified: lucene/nutch/branches/mapred/src/java/org/apache/nutch/parse/ParseImpl.java URL: http://svn.apache.org/viewcvs/lucene/nutch/branches/mapred/src/java/org/apache/nutch/parse/ParseImpl.java?rev=233359&r1=233358&r2=233359&view=diff ============================================================================== --- lucene/nutch/branches/mapred/src/java/org/apache/nutch/parse/ParseImpl.java (original) +++ lucene/nutch/branches/mapred/src/java/org/apache/nutch/parse/ParseImpl.java Thu Aug 18 12:12:31 2005 @@ -29,6 +29,10 @@ public ParseImpl() {} + public ParseImpl(Parse parse) { + this(parse.getText(), parse.getData()); + } + public ParseImpl(String text, ParseData data) { this(new ParseText(text), data); } @@ -53,6 +57,12 @@ data = new ParseData(); data.readFields(in); + } + + public static ParseImpl read(DataInput in) throws IOException { + ParseImpl parseImpl = new ParseImpl(); + parseImpl.readFields(in); + return parseImpl; } }