Author: siren Date: Thu Jan 25 10:11:59 2007 New Revision: 499878 URL: http://svn.apache.org/viewvc?view=rev&rev=499878 Log: NUTCH-433
Modified: lucene/nutch/trunk/CHANGES.txt lucene/nutch/trunk/src/java/org/apache/nutch/indexer/Indexer.java lucene/nutch/trunk/src/java/org/apache/nutch/segment/SegmentMerger.java lucene/nutch/trunk/src/java/org/apache/nutch/segment/SegmentReader.java Modified: lucene/nutch/trunk/CHANGES.txt URL: http://svn.apache.org/viewvc/lucene/nutch/trunk/CHANGES.txt?view=diff&rev=499878&r1=499877&r2=499878 ============================================================================== --- lucene/nutch/trunk/CHANGES.txt (original) +++ lucene/nutch/trunk/CHANGES.txt Thu Jan 25 10:11:59 2007 @@ -139,6 +139,9 @@ 45. NUTCH-68 - Add a tool to generate arbitrary fetchlists. (ab) +46. NUTCH-433 - java.io.EOFException in newer nightlies in mergesegs + or indexing from hadoop.io.DataOutputBuffer (siren) + Release 0.8 - 2006-07-25 Modified: lucene/nutch/trunk/src/java/org/apache/nutch/indexer/Indexer.java URL: http://svn.apache.org/viewvc/lucene/nutch/trunk/src/java/org/apache/nutch/indexer/Indexer.java?view=diff&rev=499878&r1=499877&r2=499878 ============================================================================== --- lucene/nutch/trunk/src/java/org/apache/nutch/indexer/Indexer.java (original) +++ lucene/nutch/trunk/src/java/org/apache/nutch/indexer/Indexer.java Thu Jan 25 10:11:59 2007 @@ -24,7 +24,6 @@ import org.apache.commons.logging.LogFactory; import org.apache.hadoop.io.*; -import org.apache.nutch.fetcher.Fetcher; import org.apache.hadoop.fs.*; import org.apache.hadoop.conf.*; import org.apache.hadoop.mapred.*; @@ -51,41 +50,12 @@ import org.apache.nutch.metadata.Nutch; /** Create indexes for segments. */ -public class Indexer extends ToolBase implements Reducer { +public class Indexer extends ToolBase implements Reducer, Mapper { public static final String DONE_NAME = "index.done"; public static final Log LOG = LogFactory.getLog(Indexer.class); - /** Wraps inputs in an [EMAIL PROTECTED] ObjectWritable}, to permit merging different - * types in reduce. */ - public static class InputFormat extends SequenceFileInputFormat { - public RecordReader getRecordReader(FileSystem fs, FileSplit split, - JobConf job, Reporter reporter) - throws IOException { - - reporter.setStatus(split.toString()); - - return new SequenceFileRecordReader(job, split) { - public synchronized boolean next(Writable key, Writable value) - throws IOException { - ObjectWritable wrapper = (ObjectWritable)value; - try { - wrapper.set(getValueClass().newInstance()); - } catch (Exception e) { - throw new IOException(e.toString()); - } - return super.next(key, (Writable)wrapper.get()); - } - - // override the default - we want ObjectWritable-s here - public Writable createValue() { - return new ObjectWritable(); - } - }; - } - } - /** Unwrap Lucene Documents created by reduce and add them to an index. */ public static class OutputFormat extends org.apache.hadoop.mapred.OutputFormatBase { @@ -290,12 +260,9 @@ job.addInputPath(new Path(crawlDb, CrawlDb.CURRENT_NAME)); job.addInputPath(new Path(linkDb, LinkDb.CURRENT_NAME)); + job.setInputFormat(SequenceFileInputFormat.class); - job.setInputFormat(InputFormat.class); - //job.setInputKeyClass(Text.class); - //job.setInputValueClass(ObjectWritable.class); - - //job.setCombinerClass(Indexer.class); + job.setMapperClass(Indexer.class); job.setReducerClass(Indexer.class); job.setOutputPath(indexDir); @@ -332,6 +299,11 @@ LOG.fatal("Indexer: " + StringUtils.stringifyException(e)); return -1; } + } + + public void map(WritableComparable key, Writable value, + OutputCollector output, Reporter reporter) throws IOException { + output.collect(key, new ObjectWritable(value)); } } Modified: lucene/nutch/trunk/src/java/org/apache/nutch/segment/SegmentMerger.java URL: http://svn.apache.org/viewvc/lucene/nutch/trunk/src/java/org/apache/nutch/segment/SegmentMerger.java?view=diff&rev=499878&r1=499877&r2=499878 ============================================================================== --- lucene/nutch/trunk/src/java/org/apache/nutch/segment/SegmentMerger.java (original) +++ lucene/nutch/trunk/src/java/org/apache/nutch/segment/SegmentMerger.java Thu Jan 25 10:11:59 2007 @@ -32,9 +32,7 @@ import org.apache.hadoop.util.Progressable; import org.apache.nutch.crawl.CrawlDatum; import org.apache.nutch.crawl.Generator; -import org.apache.nutch.fetcher.Fetcher; import org.apache.nutch.metadata.MetaWrapper; -import org.apache.nutch.metadata.Metadata; import org.apache.nutch.metadata.Nutch; import org.apache.nutch.net.URLFilters; import org.apache.nutch.parse.ParseData; @@ -104,37 +102,61 @@ * types in reduce and use additional metadata. */ public static class ObjectInputFormat extends SequenceFileInputFormat { - public RecordReader getRecordReader(FileSystem fs, FileSplit split, JobConf job, Reporter reporter) - throws IOException { - - reporter.setStatus(split.toString()); + + @Override + public RecordReader getRecordReader(InputSplit split, + JobConf job, Reporter reporter) { + + try{ + reporter.setStatus(split.toString()); + } catch (IOException e) { + throw new RuntimeException("Cannot set status for reported:", e); + } // find part name - final SegmentPart segmentPart = SegmentPart.get(split); - final String spString = segmentPart.toString(); - - return new SequenceFileRecordReader(job, split) { - public synchronized boolean next(Writable key, Writable value) throws IOException { - MetaWrapper wrapper = (MetaWrapper) value; - try { - wrapper.set(getValueClass().newInstance()); - } catch (Exception e) { - throw new IOException(e.toString()); + SegmentPart segmentPart; + final String spString; + try { + segmentPart = SegmentPart.get((FileSplit) split); + spString = segmentPart.toString(); + } catch (IOException e) { + throw new RuntimeException("Cannot identify segment:", e); + } + + try { + return new SequenceFileRecordReader(job, (FileSplit)split) { + + @Override + public synchronized boolean next(Writable key, Writable value) throws IOException { + LOG.debug("Running OIF.next()"); + + MetaWrapper wrapper = (MetaWrapper) value; + try { + wrapper.set(getValueClass().newInstance()); + } catch (Exception e) { + throw new IOException(e.toString()); + } + + boolean res = super.next(key, (Writable) wrapper.get()); + wrapper.setMeta(SEGMENT_PART_KEY, spString); + return res; + } + + @Override + public Writable createValue() { + return new MetaWrapper(); } - boolean res = super.next(key, (Writable) wrapper.get()); - wrapper.setMeta(SEGMENT_PART_KEY, spString); - return res; - } - - public Writable createValue() { - return new MetaWrapper(); - } - }; + + }; + } catch (IOException e) { + throw new RuntimeException("Cannot create RecordReader: ", e); + } } } public static class SegmentOutputFormat extends OutputFormatBase { private static final String DEFAULT_SLICE = "default"; + @Override public RecordWriter getRecordWriter(final FileSystem fs, final JobConf job, final String name, final Progressable progress) throws IOException { return new RecordWriter() { MapFile.Writer c_out = null; Modified: lucene/nutch/trunk/src/java/org/apache/nutch/segment/SegmentReader.java URL: http://svn.apache.org/viewvc/lucene/nutch/trunk/src/java/org/apache/nutch/segment/SegmentReader.java?view=diff&rev=499878&r1=499877&r2=499878 ============================================================================== --- lucene/nutch/trunk/src/java/org/apache/nutch/segment/SegmentReader.java (original) +++ lucene/nutch/trunk/src/java/org/apache/nutch/segment/SegmentReader.java Thu Jan 25 10:11:59 2007 @@ -47,33 +47,6 @@ private boolean co, fe, ge, pa, pd, pt; private FileSystem fs; - /** - * Wraps inputs in an [EMAIL PROTECTED] ObjectWritable}, to permit merging different - * types in reduce. - */ - public static class InputFormat extends SequenceFileInputFormat { - public RecordReader getRecordReader(FileSystem fs, FileSplit split, JobConf job, Reporter reporter) - throws IOException { - reporter.setStatus(split.toString()); - - return new SequenceFileRecordReader(job, split) { - public synchronized boolean next(Writable key, Writable value) throws IOException { - ObjectWritable wrapper = (ObjectWritable) value; - try { - wrapper.set(getValueClass().newInstance()); - } catch (Exception e) { - throw new IOException(e.toString()); - } - return super.next(key, (Writable) wrapper.get()); - } - - public Writable createValue() { - return new ObjectWritable(); - } - }; - } - } - public static class InputCompatMapper extends MapReduceBase implements Mapper { private Text newKey = new Text(); @@ -83,7 +56,7 @@ newKey.set(key.toString()); key = newKey; } - collector.collect(key, value); + collector.collect(key, new ObjectWritable(value)); } } @@ -198,7 +171,7 @@ if (pd) job.addInputPath(new Path(segment, ParseData.DIR_NAME)); if (pt) job.addInputPath(new Path(segment, ParseText.DIR_NAME)); - job.setInputFormat(InputFormat.class); + job.setInputFormat(SequenceFileInputFormat.class); job.setMapperClass(InputCompatMapper.class); job.setReducerClass(SegmentReader.class); ------------------------------------------------------------------------- Take Surveys. Earn Cash. Influence the Future of IT Join SourceForge.net's Techsay panel and you'll get the chance to share your opinions on IT & business topics through brief surveys - and earn cash http://www.techsay.com/default.php?page=join.php&p=sourceforge&CID=DEVDEV _______________________________________________ Nutch-cvs mailing list Nutch-cvs@lists.sourceforge.net https://lists.sourceforge.net/lists/listinfo/nutch-cvs