Author: cutting
Date: Thu Aug 18 12:12:31 2005
New Revision: 233359

URL: http://svn.apache.org/viewcvs?rev=233359&view=rev
Log:
Add option to parse while fetching; add option to not store content while 
fetching.

Added:
    
lucene/nutch/branches/mapred/src/java/org/apache/nutch/crawl/ParseOutputFormat.java
Modified:
    lucene/nutch/branches/mapred/conf/nutch-default.xml
    lucene/nutch/branches/mapred/src/java/org/apache/nutch/crawl/Crawl.java
    lucene/nutch/branches/mapred/src/java/org/apache/nutch/crawl/Fetcher.java
    
lucene/nutch/branches/mapred/src/java/org/apache/nutch/crawl/FetcherOutput.java
    
lucene/nutch/branches/mapred/src/java/org/apache/nutch/crawl/FetcherOutputFormat.java
    lucene/nutch/branches/mapred/src/java/org/apache/nutch/crawl/Indexer.java
    
lucene/nutch/branches/mapred/src/java/org/apache/nutch/crawl/ParseSegment.java
    lucene/nutch/branches/mapred/src/java/org/apache/nutch/mapred/MapTask.java
    
lucene/nutch/branches/mapred/src/java/org/apache/nutch/mapred/ReduceTask.java
    lucene/nutch/branches/mapred/src/java/org/apache/nutch/parse/ParseImpl.java

Modified: lucene/nutch/branches/mapred/conf/nutch-default.xml
URL: 
http://svn.apache.org/viewcvs/lucene/nutch/branches/mapred/conf/nutch-default.xml?rev=233359&r1=233358&r2=233359&view=diff
==============================================================================
--- lucene/nutch/branches/mapred/conf/nutch-default.xml (original)
+++ lucene/nutch/branches/mapred/conf/nutch-default.xml Thu Aug 18 12:12:31 2005
@@ -312,6 +312,18 @@
   <description>If true, fetcher will log more verbosely.</description>
 </property>
 
+<property>
+  <name>fetcher.parse</name>
+  <value>true</value>
+  <description>If true, fetcher will parse content.</description>
+</property>
+
+<property>
+  <name>fetcher.store.content</name>
+  <value>true</value>
+  <description>If true, fetcher will store content.</description>
+</property>
+
 <!-- i/o properties -->
 
 <property>
@@ -391,7 +403,7 @@
 
 <property>
   <name>mapred.map.tasks</name>
-  <value>1</value>
+  <value>2</value>
   <description>The default number of map tasks per job.  Typically set
   to a prime several times greater than number of available hosts.
   Ignored when mapred.job.tracker is "local".  

Modified: 
lucene/nutch/branches/mapred/src/java/org/apache/nutch/crawl/Crawl.java
URL: 
http://svn.apache.org/viewcvs/lucene/nutch/branches/mapred/src/java/org/apache/nutch/crawl/Crawl.java?rev=233359&r1=233358&r2=233359&view=diff
==============================================================================
--- lucene/nutch/branches/mapred/src/java/org/apache/nutch/crawl/Crawl.java 
(original)
+++ lucene/nutch/branches/mapred/src/java/org/apache/nutch/crawl/Crawl.java Thu 
Aug 18 12:12:31 2005
@@ -101,7 +101,9 @@
         new Generator(conf).generate(crawlDb, segments, -1,
                                      topN, System.currentTimeMillis());
       new Fetcher(conf).fetch(segment, threads);  // fetch it
-      new ParseSegment(conf).parse(segment);      // parse it
+      if (!Fetcher.isParsing(conf)) {
+        new ParseSegment(conf).parse(segment);    // parse it, if needed
+      }
       new CrawlDb(conf).update(crawlDb, segment); // update crawldb
     }
       

Modified: 
lucene/nutch/branches/mapred/src/java/org/apache/nutch/crawl/Fetcher.java
URL: 
http://svn.apache.org/viewcvs/lucene/nutch/branches/mapred/src/java/org/apache/nutch/crawl/Fetcher.java?rev=233359&r1=233358&r2=233359&view=diff
==============================================================================
--- lucene/nutch/branches/mapred/src/java/org/apache/nutch/crawl/Fetcher.java 
(original)
+++ lucene/nutch/branches/mapred/src/java/org/apache/nutch/crawl/Fetcher.java 
Thu Aug 18 12:12:31 2005
@@ -37,6 +37,7 @@
     LogFormatter.getLogger("org.apache.nutch.fetcher.Fetcher");
   
   public static final String DIGEST_KEY = "nutch.content.digest";
+  public static final String SEGMENT_NAME_KEY = "nutch.segment.name";
 
   public static class InputFormat extends SequenceFileInputFormat {
     /** Don't split inputs, to keep things polite. */
@@ -55,6 +56,7 @@
   private OutputCollector output;
   private Reporter reporter;
 
+  private String segmentName;
   private int activeThreads;
   private int maxRedirect;
 
@@ -64,6 +66,9 @@
   private int pages;                              // total pages fetched
   private int errors;                             // total pages errored
 
+  private boolean storingContent;
+  private boolean parsing;
+
   private class FetcherThread extends Thread {
     public void run() {
       synchronized (Fetcher.this) {activeThreads++;} // count threads
@@ -180,9 +185,32 @@
 
       content.getMetadata().setProperty           // add digest to metadata
         (DIGEST_KEY, MD5Hash.digest(content.getContent()).toString());
+      content.getMetadata().setProperty           // add segment to metadata
+        (SEGMENT_NAME_KEY, segmentName);
+
+      Parse parse = null;
+      if (parsing) {
+        ParseStatus parseStatus;
+        try {
+          Parser parser = ParserFactory.getParser(content.getContentType(),
+                                                  content.getBaseUrl());
+          parse = parser.getParse(content);
+          parseStatus = parse.getData().getStatus();
+        } catch (Exception e) {
+          parseStatus = new ParseStatus(e);
+        }
+        if (!parseStatus.isSuccess()) {
+          LOG.warning("Error parsing: "+key+": "+parseStatus);
+          parse = null;
+        }
+      }
 
       try {
-        output.collect(key, new FetcherOutput(datum, content));
+        output.collect
+          (key,
+           new FetcherOutput(datum,
+                             storingContent ? content : null,
+                             parse != null ? new ParseImpl(parse) : null));
       } catch (IOException e) {
         LOG.severe("fetcher caught:"+e.toString());
       }
@@ -214,11 +242,24 @@
 
   public void configure(JobConf job) {
     setConf(job);
+
+    this.segmentName = job.get(SEGMENT_NAME_KEY);
+    this.storingContent = isStoringContent(job);
+    this.parsing = isParsing(job);
+
     if (job.getBoolean("fetcher.verbose", false)) {
       LOG.setLevel(Level.FINE);
     }
   }
 
+  public static boolean isParsing(NutchConf conf) {
+    return conf.getBoolean("fetcher.parse", true);
+  }
+
+  public static boolean isStoringContent(NutchConf conf) {
+    return conf.getBoolean("fetcher.store.content", true);
+  }
+
   public void run(RecordReader input, OutputCollector output,
                   Reporter reporter) throws IOException {
 
@@ -253,6 +294,7 @@
     JobConf job = new JobConf(getConf());
 
     job.setInt("fetcher.threads.fetch", threads);
+    job.set(SEGMENT_NAME_KEY, segment.getName());
 
     job.setInputDir(new File(segment, CrawlDatum.GENERATE_DIR_NAME));
     job.setInputFormat(InputFormat.class);

Modified: 
lucene/nutch/branches/mapred/src/java/org/apache/nutch/crawl/FetcherOutput.java
URL: 
http://svn.apache.org/viewcvs/lucene/nutch/branches/mapred/src/java/org/apache/nutch/crawl/FetcherOutput.java?rev=233359&r1=233358&r2=233359&view=diff
==============================================================================
--- 
lucene/nutch/branches/mapred/src/java/org/apache/nutch/crawl/FetcherOutput.java 
(original)
+++ 
lucene/nutch/branches/mapred/src/java/org/apache/nutch/crawl/FetcherOutput.java 
Thu Aug 18 12:12:31 2005
@@ -22,31 +22,46 @@
 import org.apache.nutch.fs.*;
 import org.apache.nutch.util.*;
 import org.apache.nutch.protocol.Content;
+import org.apache.nutch.parse.*;
 
 /* An entry in the fetcher's output. */
 public final class FetcherOutput implements Writable {
   private CrawlDatum crawlDatum;
   private Content content;
+  private ParseImpl parse;
 
   public FetcherOutput() {}
 
-  public FetcherOutput(CrawlDatum crawlDatum, Content content) {
+  public FetcherOutput(CrawlDatum crawlDatum, Content content,
+                       ParseImpl parse) {
     this.crawlDatum = crawlDatum;
     this.content = content;
+    this.parse = parse;
   }
 
   public final void readFields(DataInput in) throws IOException {
     this.crawlDatum = CrawlDatum.read(in);
-    this.content = Content.read(in);
+    this.content = in.readBoolean() ? Content.read(in) : null;
+    this.parse = in.readBoolean() ? ParseImpl.read(in) : null;
   }
 
   public final void write(DataOutput out) throws IOException {
     crawlDatum.write(out);
-    content.write(out);
+
+    out.writeBoolean(content != null);
+    if (content != null) {
+      content.write(out);
+    }
+
+    out.writeBoolean(parse != null);
+    if (parse != null) {
+      parse.write(out);
+    }
   }
 
   public CrawlDatum getCrawlDatum() { return crawlDatum; }
   public Content getContent() { return content; }
+  public ParseImpl getParse() { return parse; }
 
   public boolean equals(Object o) {
     if (!(o instanceof FetcherOutput))

Modified: 
lucene/nutch/branches/mapred/src/java/org/apache/nutch/crawl/FetcherOutputFormat.java
URL: 
http://svn.apache.org/viewcvs/lucene/nutch/branches/mapred/src/java/org/apache/nutch/crawl/FetcherOutputFormat.java?rev=233359&r1=233358&r2=233359&view=diff
==============================================================================
--- 
lucene/nutch/branches/mapred/src/java/org/apache/nutch/crawl/FetcherOutputFormat.java
 (original)
+++ 
lucene/nutch/branches/mapred/src/java/org/apache/nutch/crawl/FetcherOutputFormat.java
 Thu Aug 18 12:12:31 2005
@@ -35,21 +35,32 @@
 /** Splits FetcherOutput entries into multiple map files. */
 public class FetcherOutputFormat implements OutputFormat {
 
-  public RecordWriter getRecordWriter(NutchFileSystem fs, JobConf job,
-                                      String name) throws IOException {
+  public RecordWriter getRecordWriter(final NutchFileSystem fs,
+                                      final JobConf job,
+                                      final String name) throws IOException {
 
-    File fetch =
+    final File fetch =
       new File(new File(job.getOutputDir(), CrawlDatum.FETCH_DIR_NAME), name);
-    File content =
+    final File content =
       new File(new File(job.getOutputDir(), Content.DIR_NAME), name);
 
     final MapFile.Writer fetchOut =
       new MapFile.Writer(fs, fetch.toString(), UTF8.class, CrawlDatum.class);
     
-    final MapFile.Writer contentOut =
-      new MapFile.Writer(fs, content.toString(), UTF8.class, Content.class);
-
     return new RecordWriter() {
+        private MapFile.Writer contentOut;
+        private RecordWriter parseOut;
+
+        {
+          if (Fetcher.isStoringContent(job)) {
+            contentOut = new MapFile.Writer(fs, content.toString(),
+                                            UTF8.class, Content.class);
+          }
+
+          if (Fetcher.isParsing(job)) {
+            parseOut = new ParseOutputFormat().getRecordWriter(fs, job, name);
+          }
+        }
 
         public void write(WritableComparable key, Writable value)
           throws IOException {
@@ -57,12 +68,25 @@
           FetcherOutput fo = (FetcherOutput)value;
           
           fetchOut.append(key, fo.getCrawlDatum());
-          contentOut.append(key, fo.getContent());
+
+          if (fo.getContent() != null) {
+            contentOut.append(key, fo.getContent());
+          }
+
+          if (fo.getParse() != null) {
+            parseOut.write(key, fo.getParse());
+          }
+
         }
 
         public void close() throws IOException {
           fetchOut.close();
-          contentOut.close();
+          if (contentOut != null) {
+            contentOut.close();
+          }
+          if (parseOut != null) {
+            parseOut.close();
+          }
         }
 
       };

Modified: 
lucene/nutch/branches/mapred/src/java/org/apache/nutch/crawl/Indexer.java
URL: 
http://svn.apache.org/viewcvs/lucene/nutch/branches/mapred/src/java/org/apache/nutch/crawl/Indexer.java?rev=233359&r1=233358&r2=233359&view=diff
==============================================================================
--- lucene/nutch/branches/mapred/src/java/org/apache/nutch/crawl/Indexer.java 
(original)
+++ lucene/nutch/branches/mapred/src/java/org/apache/nutch/crawl/Indexer.java 
Thu Aug 18 12:12:31 2005
@@ -155,7 +155,7 @@
 
     // add segment, used to map from merged index back to segment files
     doc.add(Field.UnIndexed("segment",
-                            meta.getProperty(ParseSegment.SEGMENT_NAME_KEY)));
+                            meta.getProperty(Fetcher.SEGMENT_NAME_KEY)));
 
     // add digest, used by dedup
     doc.add(Field.UnIndexed("digest", meta.getProperty(Fetcher.DIGEST_KEY)));

Added: 
lucene/nutch/branches/mapred/src/java/org/apache/nutch/crawl/ParseOutputFormat.java
URL: 
http://svn.apache.org/viewcvs/lucene/nutch/branches/mapred/src/java/org/apache/nutch/crawl/ParseOutputFormat.java?rev=233359&view=auto
==============================================================================
--- 
lucene/nutch/branches/mapred/src/java/org/apache/nutch/crawl/ParseOutputFormat.java
 (added)
+++ 
lucene/nutch/branches/mapred/src/java/org/apache/nutch/crawl/ParseOutputFormat.java
 Thu Aug 18 12:12:31 2005
@@ -0,0 +1,92 @@
+/**
+ * Copyright 2005 The Apache Software Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.nutch.crawl;
+
+import org.apache.nutch.io.*;
+import org.apache.nutch.fs.*;
+import org.apache.nutch.mapred.*;
+import org.apache.nutch.parse.*;
+import org.apache.nutch.net.*;
+
+import java.io.*;
+import java.util.*;
+
+/* Parse content in a segment. */
+public class ParseOutputFormat implements OutputFormat {
+
+  private UrlNormalizer urlNormalizer = UrlNormalizerFactory.getNormalizer();
+
+  public RecordWriter getRecordWriter(NutchFileSystem fs, JobConf job,
+                                      String name) throws IOException {
+
+    final float interval = job.getFloat("db.default.fetch.interval", 30f);
+
+    File text =
+      new File(new File(job.getOutputDir(), ParseText.DIR_NAME), name);
+    File data =
+      new File(new File(job.getOutputDir(), ParseData.DIR_NAME), name);
+    File crawl =
+      new File(new File(job.getOutputDir(), CrawlDatum.PARSE_DIR_NAME), name);
+    
+    final MapFile.Writer textOut =
+      new MapFile.Writer(fs, text.toString(), UTF8.class, ParseText.class);
+    
+    final MapFile.Writer dataOut =
+      new MapFile.Writer(fs, data.toString(), UTF8.class,ParseData.class,true);
+    
+    final SequenceFile.Writer crawlOut =
+      new SequenceFile.Writer(fs, crawl.toString(),
+                              UTF8.class, CrawlDatum.class);
+    
+    return new RecordWriter() {
+
+        public void write(WritableComparable key, Writable value)
+          throws IOException {
+          
+          Parse parse = (Parse)value;
+          
+          textOut.append(key, new ParseText(parse.getText()));
+          dataOut.append(key, parse.getData());
+
+          // collect outlinks for subsequent db update
+          Outlink[] links = parse.getData().getOutlinks();
+          for (int i = 0; i < links.length; i++) {
+            String toUrl = links[i].getToUrl();
+            try {
+              toUrl = urlNormalizer.normalize(toUrl); // normalize the url
+              toUrl = URLFilters.filter(toUrl);   // filter the url
+            } catch (Exception e) {
+              toUrl = null;
+            }
+            if (toUrl != null)
+              crawlOut.append(new UTF8(toUrl),
+                              new CrawlDatum(CrawlDatum.STATUS_LINKED,
+                                             interval));
+          }
+        }
+        
+        public void close() throws IOException {
+          textOut.close();
+          dataOut.close();
+          crawlOut.close();
+        }
+        
+      };
+    
+  }
+
+}

Modified: 
lucene/nutch/branches/mapred/src/java/org/apache/nutch/crawl/ParseSegment.java
URL: 
http://svn.apache.org/viewcvs/lucene/nutch/branches/mapred/src/java/org/apache/nutch/crawl/ParseSegment.java?rev=233359&r1=233358&r2=233359&view=diff
==============================================================================
--- 
lucene/nutch/branches/mapred/src/java/org/apache/nutch/crawl/ParseSegment.java 
(original)
+++ 
lucene/nutch/branches/mapred/src/java/org/apache/nutch/crawl/ParseSegment.java 
Thu Aug 18 12:12:31 2005
@@ -29,19 +29,11 @@
 import java.util.logging.*;
 
 /* Parse content in a segment. */
-public class ParseSegment 
-  extends NutchConfigured implements Mapper, Reducer, OutputFormat {
+public class ParseSegment extends NutchConfigured implements Mapper, Reducer {
 
   public static final Logger LOG =
     LogFormatter.getLogger(Parser.class.getName());
 
-  public static final String SEGMENT_NAME_KEY = "nutch.segment.name";
-
-  private float interval;
-  private String segmentName;
-
-  private UrlNormalizer urlNormalizer = UrlNormalizerFactory.getNormalizer();
-        
   public ParseSegment() { super(null); }
 
   public ParseSegment(NutchConf conf) {
@@ -49,8 +41,6 @@
   }
 
   public void configure(JobConf job) {
-    interval = job.getFloat("db.default.fetch.interval", 30f);
-    segmentName = job.get(SEGMENT_NAME_KEY);
   }
 
   public void map(WritableComparable key, Writable value,
@@ -70,7 +60,6 @@
     }
 
     if (status.isSuccess()) {
-      parse.getData().getMetadata().setProperty(SEGMENT_NAME_KEY, segmentName);
       output.collect(key, new ParseImpl(parse.getText(), parse.getData()));
     } else {
       LOG.warning("Error parsing: "+key+": "+status.toString());
@@ -83,70 +72,12 @@
     output.collect(key, (Writable)values.next()); // collect first value
   }
 
-  public RecordWriter getRecordWriter(NutchFileSystem fs, JobConf job,
-                                      String name) throws IOException {
-    File text =
-      new File(new File(job.getOutputDir(), ParseText.DIR_NAME), name);
-    File data =
-      new File(new File(job.getOutputDir(), ParseData.DIR_NAME), name);
-    File crawl =
-      new File(new File(job.getOutputDir(), CrawlDatum.PARSE_DIR_NAME), name);
-    
-    final MapFile.Writer textOut =
-      new MapFile.Writer(fs, text.toString(), UTF8.class, ParseText.class);
-    
-    final MapFile.Writer dataOut =
-      new MapFile.Writer(fs, data.toString(), UTF8.class,ParseData.class,true);
-    
-    final SequenceFile.Writer crawlOut =
-      new SequenceFile.Writer(fs, crawl.toString(),
-                              UTF8.class, CrawlDatum.class);
-    
-    return new RecordWriter() {
-
-        public void write(WritableComparable key, Writable value)
-          throws IOException {
-          
-          Parse parse = (Parse)value;
-          
-          textOut.append(key, new ParseText(parse.getText()));
-          dataOut.append(key, parse.getData());
-
-          // collect outlinks for subsequent db update
-          Outlink[] links = parse.getData().getOutlinks();
-          for (int i = 0; i < links.length; i++) {
-            String toUrl = links[i].getToUrl();
-            try {
-              toUrl = urlNormalizer.normalize(toUrl); // normalize the url
-              toUrl = URLFilters.filter(toUrl);   // filter the url
-            } catch (Exception e) {
-              toUrl = null;
-            }
-            if (toUrl != null)
-              crawlOut.append(new UTF8(toUrl),
-                              new CrawlDatum(CrawlDatum.STATUS_LINKED,
-                                             interval));
-          }
-        }
-        
-        public void close() throws IOException {
-          textOut.close();
-          dataOut.close();
-          crawlOut.close();
-        }
-        
-      };
-    
-  }      
-
   public void parse(File segment) throws IOException {
     LOG.info("Parse: starting");
     LOG.info("Parse: segment: " + segment);
 
     JobConf job = new JobConf(getConf());
 
-    job.set(SEGMENT_NAME_KEY, segment.getName());
-
     job.setInputDir(new File(segment, Content.DIR_NAME));
     job.setInputFormat(SequenceFileInputFormat.class);
     job.setInputKeyClass(UTF8.class);
@@ -155,7 +86,7 @@
     job.setReducerClass(ParseSegment.class);
     
     job.setOutputDir(segment);
-    job.setOutputFormat(ParseSegment.class);
+    job.setOutputFormat(ParseOutputFormat.class);
     job.setOutputKeyClass(UTF8.class);
     job.setOutputValueClass(ParseImpl.class);
 

Modified: 
lucene/nutch/branches/mapred/src/java/org/apache/nutch/mapred/MapTask.java
URL: 
http://svn.apache.org/viewcvs/lucene/nutch/branches/mapred/src/java/org/apache/nutch/mapred/MapTask.java?rev=233359&r1=233358&r2=233359&view=diff
==============================================================================
--- lucene/nutch/branches/mapred/src/java/org/apache/nutch/mapred/MapTask.java 
(original)
+++ lucene/nutch/branches/mapred/src/java/org/apache/nutch/mapred/MapTask.java 
Thu Aug 18 12:12:31 2005
@@ -77,6 +77,7 @@
             throws IOException {
             outs[partitioner.getPartition(key, value, partitions)]
               .append(key, value);
+            reportProgress(umbilical);
           }
         };
 

Modified: 
lucene/nutch/branches/mapred/src/java/org/apache/nutch/mapred/ReduceTask.java
URL: 
http://svn.apache.org/viewcvs/lucene/nutch/branches/mapred/src/java/org/apache/nutch/mapred/ReduceTask.java?rev=233359&r1=233358&r2=233359&view=diff
==============================================================================
--- 
lucene/nutch/branches/mapred/src/java/org/apache/nutch/mapred/ReduceTask.java 
(original)
+++ 
lucene/nutch/branches/mapred/src/java/org/apache/nutch/mapred/ReduceTask.java 
Thu Aug 18 12:12:31 2005
@@ -246,6 +246,7 @@
         public void collect(WritableComparable key, Writable value)
           throws IOException {
           out.write(key, value);
+          reportProgress(umbilical);
         }
       };
     

Modified: 
lucene/nutch/branches/mapred/src/java/org/apache/nutch/parse/ParseImpl.java
URL: 
http://svn.apache.org/viewcvs/lucene/nutch/branches/mapred/src/java/org/apache/nutch/parse/ParseImpl.java?rev=233359&r1=233358&r2=233359&view=diff
==============================================================================
--- lucene/nutch/branches/mapred/src/java/org/apache/nutch/parse/ParseImpl.java 
(original)
+++ lucene/nutch/branches/mapred/src/java/org/apache/nutch/parse/ParseImpl.java 
Thu Aug 18 12:12:31 2005
@@ -29,6 +29,10 @@
 
   public ParseImpl() {}
 
+  public ParseImpl(Parse parse) {
+    this(parse.getText(), parse.getData());
+  }
+
   public ParseImpl(String text, ParseData data) {
     this(new ParseText(text), data);
   }
@@ -53,6 +57,12 @@
 
     data = new ParseData();
     data.readFields(in);
+  }
+
+  public static ParseImpl read(DataInput in) throws IOException {
+    ParseImpl parseImpl = new ParseImpl();
+    parseImpl.readFields(in);
+    return parseImpl;
   }
 
 }


Reply via email to