Author: cutting Date: Wed Jul 6 11:37:44 2005 New Revision: 209495 URL: http://svn.apache.org/viewcvs?rev=209495&view=rev Log: Add unit test for SequenceFile InputFormat. Fix code to pass unit test. SequenceFile now inserts sync marks after a fixed number of bytes rather than after a fixed number of entries.
Added: lucene/nutch/branches/mapred/src/test/org/apache/nutch/mapred/ lucene/nutch/branches/mapred/src/test/org/apache/nutch/mapred/TestSequenceFileInputFormat.java Modified: lucene/nutch/branches/mapred/src/java/org/apache/nutch/io/SequenceFile.java lucene/nutch/branches/mapred/src/java/org/apache/nutch/mapred/FileSplit.java lucene/nutch/branches/mapred/src/java/org/apache/nutch/mapred/InputFormatBase.java lucene/nutch/branches/mapred/src/java/org/apache/nutch/mapred/SequenceFileInputFormat.java lucene/nutch/branches/mapred/src/java/org/apache/nutch/mapred/SequenceFileRecordReader.java Modified: lucene/nutch/branches/mapred/src/java/org/apache/nutch/io/SequenceFile.java URL: http://svn.apache.org/viewcvs/lucene/nutch/branches/mapred/src/java/org/apache/nutch/io/SequenceFile.java?rev=209495&r1=209494&r2=209495&view=diff ============================================================================== --- lucene/nutch/branches/mapred/src/java/org/apache/nutch/io/SequenceFile.java (original) +++ lucene/nutch/branches/mapred/src/java/org/apache/nutch/io/SequenceFile.java Wed Jul 6 11:37:44 2005 @@ -39,10 +39,12 @@ }; private static final int SYNC_ESCAPE = -1; // "length" of sync entries - private static final int SYNC_INTERVAL = 10; // num entries between syncs private static final int SYNC_HASH_SIZE = 16; // number of bytes in hash private static final int SYNC_SIZE = 4+SYNC_HASH_SIZE; // escape + hash + /** The number of bytes between sync points.*/ + public static final int SYNC_INTERVAL = 100*SYNC_SIZE; + /** Write key/value pairs to a sequence-format file. */ public static class Writer { private NFSDataOutputStream out; @@ -56,7 +58,7 @@ // Insert a globally unique 16-byte value every few entries, so that one // can seek into the middle of a file and then synchronize with record // starts and ends by scanning for this value. - private long count; // number of entries added + private long lastSyncPos; // position of last sync private final byte[] sync; // 16 random bytes { try { // use hash of uid + host @@ -145,7 +147,9 @@ if (keyLength == 0) throw new IOException("zero length keys not allowed"); - if ((++count % SYNC_INTERVAL) == 0) { // time to emit sync + if (out.getPos() >= lastSyncPos+SYNC_INTERVAL) { // time to emit sync + lastSyncPos = out.getPos(); // update lastSyncPos + //LOG.info("sync@"+lastSyncPos); out.writeInt(SYNC_ESCAPE); // escape it out.write(sync); // write sync } @@ -297,6 +301,7 @@ int length = in.readInt(); if (version[3] > 1 && length == SYNC_ESCAPE) { // process a sync entry + //LOG.info("sync@"+in.getPos()); in.readFully(syncCheck); // read syncCheck if (!Arrays.equals(sync, syncCheck)) // check it throw new IOException("File is corrupt!"); @@ -318,12 +323,12 @@ /** Seek to the next sync mark past a given position.*/ public synchronized void sync(long position) throws IOException { - if (position+sync.length >= end) { + if (position+SYNC_SIZE >= end) { seek(end); return; } - seek(position); + seek(position+4); // skip escape in.readFully(syncCheck); int syncLen = sync.length; for (int i = 0; in.getPos() < end; i++) { @@ -332,8 +337,10 @@ if (sync[j] != syncCheck[(i+j)%syncLen]) break; } - if (j == syncLen) + if (j == syncLen) { + in.seek(in.getPos() - SYNC_SIZE); // position before sync return; + } syncCheck[i%syncLen] = in.readByte(); } } Modified: lucene/nutch/branches/mapred/src/java/org/apache/nutch/mapred/FileSplit.java URL: http://svn.apache.org/viewcvs/lucene/nutch/branches/mapred/src/java/org/apache/nutch/mapred/FileSplit.java?rev=209495&r1=209494&r2=209495&view=diff ============================================================================== --- lucene/nutch/branches/mapred/src/java/org/apache/nutch/mapred/FileSplit.java (original) +++ lucene/nutch/branches/mapred/src/java/org/apache/nutch/mapred/FileSplit.java Wed Jul 6 11:37:44 2005 @@ -56,6 +56,7 @@ /** The number of bytes in the file to process. */ public long getLength() { return length; } + public String toString() { return file + ":" + start + "+" + length; } //////////////////////////////////////////// // Writable methods Modified: lucene/nutch/branches/mapred/src/java/org/apache/nutch/mapred/InputFormatBase.java URL: http://svn.apache.org/viewcvs/lucene/nutch/branches/mapred/src/java/org/apache/nutch/mapred/InputFormatBase.java?rev=209495&r1=209494&r2=209495&view=diff ============================================================================== --- lucene/nutch/branches/mapred/src/java/org/apache/nutch/mapred/InputFormatBase.java (original) +++ lucene/nutch/branches/mapred/src/java/org/apache/nutch/mapred/InputFormatBase.java Wed Jul 6 11:37:44 2005 @@ -21,14 +21,25 @@ import java.util.Arrays; import java.util.ArrayList; +import java.util.logging.Logger; import org.apache.nutch.fs.NutchFileSystem; +import org.apache.nutch.util.LogFormatter; /** A base class for [EMAIL PROTECTED] InputFormat}. */ public abstract class InputFormatBase implements InputFormat { + public static final Logger LOG = + LogFormatter.getLogger("org.apache.nutch.mapred.InputFormatBase"); + private static final double SPLIT_SLOP = 0.1; // 10% slop + private int minSplitSize = 1; + + protected void setMinSplitSize(int minSplitSize) { + this.minSplitSize = minSplitSize; + } + public abstract RecordReader getRecordReader(NutchFileSystem fs, FileSplit split, JobConf job) throws IOException; @@ -84,8 +95,11 @@ totalSize += fs.getLength(files[i]); } - long bytesPerSplit = totalSize / numSplits; + long bytesPerSplit = Math.max(totalSize / numSplits, minSplitSize); long maxPerSplit = bytesPerSplit + (long)(bytesPerSplit*SPLIT_SLOP); + + //LOG.info("bytesPerSplit = " + bytesPerSplit); + //LOG.info("maxPerSplit = " + maxPerSplit); ArrayList splits = new ArrayList(numSplits); // generate splits for (int i = 0; i < files.length; i++) { Modified: lucene/nutch/branches/mapred/src/java/org/apache/nutch/mapred/SequenceFileInputFormat.java URL: http://svn.apache.org/viewcvs/lucene/nutch/branches/mapred/src/java/org/apache/nutch/mapred/SequenceFileInputFormat.java?rev=209495&r1=209494&r2=209495&view=diff ============================================================================== --- lucene/nutch/branches/mapred/src/java/org/apache/nutch/mapred/SequenceFileInputFormat.java (original) +++ lucene/nutch/branches/mapred/src/java/org/apache/nutch/mapred/SequenceFileInputFormat.java Wed Jul 6 11:37:44 2005 @@ -31,6 +31,10 @@ /** An [EMAIL PROTECTED] InputFormat} for [EMAIL PROTECTED] SequenceFile}s. */ public class SequenceFileInputFormat extends InputFormatBase { + public SequenceFileInputFormat() { + setMinSplitSize(SequenceFile.SYNC_INTERVAL); + } + protected File[] listFiles(NutchFileSystem fs, JobConf job) throws IOException { Modified: lucene/nutch/branches/mapred/src/java/org/apache/nutch/mapred/SequenceFileRecordReader.java URL: http://svn.apache.org/viewcvs/lucene/nutch/branches/mapred/src/java/org/apache/nutch/mapred/SequenceFileRecordReader.java?rev=209495&r1=209494&r2=209495&view=diff ============================================================================== --- lucene/nutch/branches/mapred/src/java/org/apache/nutch/mapred/SequenceFileRecordReader.java (original) +++ lucene/nutch/branches/mapred/src/java/org/apache/nutch/mapred/SequenceFileRecordReader.java Wed Jul 6 11:37:44 2005 @@ -38,7 +38,10 @@ this.in = new SequenceFile.Reader(fs, split.getFile().toString()); this.end = split.getStart() + split.getLength(); - in.sync(split.getStart()); // sync to start + if (split.getStart() > in.getPosition()) + in.sync(split.getStart()); // sync to start + + more = in.getPosition() < end; } Added: lucene/nutch/branches/mapred/src/test/org/apache/nutch/mapred/TestSequenceFileInputFormat.java URL: http://svn.apache.org/viewcvs/lucene/nutch/branches/mapred/src/test/org/apache/nutch/mapred/TestSequenceFileInputFormat.java?rev=209495&view=auto ============================================================================== --- lucene/nutch/branches/mapred/src/test/org/apache/nutch/mapred/TestSequenceFileInputFormat.java (added) +++ lucene/nutch/branches/mapred/src/test/org/apache/nutch/mapred/TestSequenceFileInputFormat.java Wed Jul 6 11:37:44 2005 @@ -0,0 +1,105 @@ +/** + * Copyright 2005 The Apache Software Foundation + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.nutch.mapred; + +import java.io.*; +import java.util.*; +import junit.framework.TestCase; +import java.util.logging.*; + +import org.apache.nutch.fs.*; +import org.apache.nutch.io.*; +import org.apache.nutch.util.*; + +public class TestSequenceFileInputFormat extends TestCase { + private static final Logger LOG = InputFormatBase.LOG; + + private static int MAX_LENGTH = 10000; + + public void testFormat() throws Exception { + JobConf job = new JobConf(NutchConf.get()); + NutchFileSystem fs = NutchFileSystem.getNamed("local"); + File dir = new File(System.getProperty("test.build.data",".") + "/mrtest"); + File file = new File(dir, "test.seq"); + + int seed = new Random().nextInt(); + LOG.info("seed = "+seed); + Random random = new Random(seed); + + dir.mkdirs(); + job.setInputDir(dir); + + // for a variety of lengths + for (int length = 0; length < MAX_LENGTH; + length+= random.nextInt(MAX_LENGTH/10)+1) { + + LOG.info("creating; entries = " + length); + + // create a file with length entries + file.delete(); + SequenceFile.Writer writer = + new SequenceFile.Writer(fs, file.toString(), + IntWritable.class, BytesWritable.class); + try { + for (int i = 0; i < length; i++) { + IntWritable key = new IntWritable(i); + byte[] data = new byte[random.nextInt(10)]; + random.nextBytes(data); + BytesWritable value = new BytesWritable(data); + writer.append(key, value); + } + } finally { + writer.close(); + } + + // try splitting the file in a variety of sizes + InputFormat format = new SequenceFileInputFormat(); + IntWritable key = new IntWritable(); + BytesWritable value = new BytesWritable(); + for (int i = 0; i < 3; i++) { + int numSplits = + random.nextInt(MAX_LENGTH/(SequenceFile.SYNC_INTERVAL/20))+1; + LOG.info("splitting: requesting = " + numSplits); + FileSplit[] splits = format.getSplits(fs, job, numSplits); + LOG.info("splitting: got = " + splits.length); + + // check each split + BitSet bits = new BitSet(length); + for (int j = 0; j < splits.length; j++) { + RecordReader reader = format.getRecordReader(fs, splits[j], job); + int count = 0; + while (reader.next(key, value)) { +// if (bits.get(key.get())) { +// LOG.info("splits["+j+"]="+splits[j]+" : " + key.get()); +// LOG.info("@"+reader.getPos()); +// } + assertFalse("Key in multiple partitions.", bits.get(key.get())); + bits.set(key.get()); + count++; + } + //LOG.info("splits["+j+"]="+splits[j]+" count=" + count); + } + assertEquals("Some keys in no partition.", length, bits.cardinality()); + } + + } + } + + public static void main(String[] args) throws Exception { + new TestSequenceFileInputFormat().testFormat(); + } +}