Author: suresh Date: Wed Dec 12 06:37:49 2012 New Revision: 1420536 URL: http://svn.apache.org/viewvc?rev=1420536&view=rev Log: HADOOP-7096. Merge 1420535 from branch-1
Modified: hadoop/common/branches/branch-1-win/CHANGES.branch-1-win.txt hadoop/common/branches/branch-1-win/src/core/org/apache/hadoop/util/LineReader.java Modified: hadoop/common/branches/branch-1-win/CHANGES.branch-1-win.txt URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-1-win/CHANGES.branch-1-win.txt?rev=1420536&r1=1420535&r2=1420536&view=diff ============================================================================== --- hadoop/common/branches/branch-1-win/CHANGES.branch-1-win.txt (original) +++ hadoop/common/branches/branch-1-win/CHANGES.branch-1-win.txt Wed Dec 12 06:37:49 2012 @@ -292,3 +292,6 @@ Branch-hadoop-1-win (branched from branc HADOOP-8617. Backport HADOOP-6148, HADOOP-6166 and HADOOP-7333 for a pure Java CRC32 calculator implementation. (Brandon Li via szetszwo) + + HADOOP-7096. Allow setting of end-of-record delimiter for TextInputFormat + (Ahmed Radwan, backported by suresh) Modified: hadoop/common/branches/branch-1-win/src/core/org/apache/hadoop/util/LineReader.java URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-1-win/src/core/org/apache/hadoop/util/LineReader.java?rev=1420536&r1=1420535&r2=1420536&view=diff ============================================================================== --- hadoop/common/branches/branch-1-win/src/core/org/apache/hadoop/util/LineReader.java (original) +++ hadoop/common/branches/branch-1-win/src/core/org/apache/hadoop/util/LineReader.java Wed Dec 12 06:37:49 2012 @@ -26,6 +26,14 @@ import org.apache.hadoop.io.Text; /** * A class that provides a line reader from an input stream. + * Depending on the constructor used, lines will either be terminated by: + * <ul> + * <li>one of the following: '\n' (LF) , '\r' (CR), + * or '\r\n' (CR+LF).</li> + * <li><em>or</em>, a custom byte sequence delimiter</li> + * </ul> + * In both cases, EOF also terminates an otherwise unterminated + * line. */ public class LineReader { private static final int DEFAULT_BUFFER_SIZE = 64 * 1024; @@ -40,6 +48,9 @@ public class LineReader { private static final byte CR = '\r'; private static final byte LF = '\n'; + // The line delimiter + private final byte[] recordDelimiterBytes; + /** * Create a line reader that reads from the given stream using the * default buffer-size (64k). @@ -61,6 +72,7 @@ public class LineReader { this.in = in; this.bufferSize = bufferSize; this.buffer = new byte[this.bufferSize]; + this.recordDelimiterBytes = null; } /** @@ -76,6 +88,56 @@ public class LineReader { } /** + * Create a line reader that reads from the given stream using the + * default buffer-size, and using a custom delimiter of array of + * bytes. + * @param in The input stream + * @param recordDelimiterBytes The delimiter + */ + public LineReader(InputStream in, byte[] recordDelimiterBytes) { + this.in = in; + this.bufferSize = DEFAULT_BUFFER_SIZE; + this.buffer = new byte[this.bufferSize]; + this.recordDelimiterBytes = recordDelimiterBytes; + } + + /** + * Create a line reader that reads from the given stream using the + * given buffer-size, and using a custom delimiter of array of + * bytes. + * @param in The input stream + * @param bufferSize Size of the read buffer + * @param recordDelimiterBytes The delimiter + * @throws IOException + */ + public LineReader(InputStream in, int bufferSize, + byte[] recordDelimiterBytes) { + this.in = in; + this.bufferSize = bufferSize; + this.buffer = new byte[this.bufferSize]; + this.recordDelimiterBytes = recordDelimiterBytes; + } + + /** + * Create a line reader that reads from the given stream using the + * <code>io.file.buffer.size</code> specified in the given + * <code>Configuration</code>, and using a custom delimiter of array of + * bytes. + * @param in input stream + * @param conf configuration + * @param recordDelimiterBytes The delimiter + * @throws IOException + */ + public LineReader(InputStream in, Configuration conf, + byte[] recordDelimiterBytes) throws IOException { + this.in = in; + this.bufferSize = conf.getInt("io.file.buffer.size", DEFAULT_BUFFER_SIZE); + this.buffer = new byte[this.bufferSize]; + this.recordDelimiterBytes = recordDelimiterBytes; + } + + + /** * Close the underlying stream. * @throws IOException */ @@ -84,10 +146,7 @@ public class LineReader { } /** - * Read one line from the InputStream into the given Text. A line - * can be terminated by one of the following: '\n' (LF) , '\r' (CR), - * or '\r\n' (CR+LF). EOF also terminates an otherwise unterminated - * line. + * Read one line from the InputStream into the given Text. * * @param str the object to store the given line (without newline) * @param maxLineLength the maximum number of bytes to store into str; @@ -104,6 +163,18 @@ public class LineReader { */ public int readLine(Text str, int maxLineLength, int maxBytesToConsume) throws IOException { + if (this.recordDelimiterBytes != null) { + return readCustomLine(str, maxLineLength, maxBytesToConsume); + } else { + return readDefaultLine(str, maxLineLength, maxBytesToConsume); + } + } + + /** + * Read a line terminated by one of CR, LF, or CRLF. + */ + private int readDefaultLine(Text str, int maxLineLength, int maxBytesToConsume) + throws IOException { /* We're reading data from in, but the head of the stream may be * already buffered in buffer, so we have several cases: * 1. No newline characters are in the buffer, so we need to copy @@ -167,6 +238,52 @@ public class LineReader { } /** + * Read a line terminated by a custom delimiter. + */ + private int readCustomLine(Text str, int maxLineLength, int maxBytesToConsume) + throws IOException { + str.clear(); + int txtLength = 0; // tracks str.getLength(), as an optimization + long bytesConsumed = 0; + int delPosn = 0; + do { + int startPosn = bufferPosn; // starting from where we left off the last + // time + if (bufferPosn >= bufferLength) { + startPosn = bufferPosn = 0; + bufferLength = in.read(buffer); + if (bufferLength <= 0) + break; // EOF + } + for (; bufferPosn < bufferLength; ++bufferPosn) { + if (buffer[bufferPosn] == recordDelimiterBytes[delPosn]) { + delPosn++; + if (delPosn >= recordDelimiterBytes.length) { + bufferPosn++; + break; + } + } else { + delPosn = 0; + } + } + int readLength = bufferPosn - startPosn; + bytesConsumed += readLength; + int appendLength = readLength - delPosn; + if (appendLength > maxLineLength - txtLength) { + appendLength = maxLineLength - txtLength; + } + if (appendLength > 0) { + str.append(buffer, startPosn, appendLength); + txtLength += appendLength; + } + } while (delPosn < recordDelimiterBytes.length + && bytesConsumed < maxBytesToConsume); + if (bytesConsumed > (long) Integer.MAX_VALUE) + throw new IOException("Too many bytes before delimiter: " + bytesConsumed); + return (int) bytesConsumed; + } + + /** * Read from the InputStream into the given Text. * @param str the object to store the given line * @param maxLineLength the maximum number of bytes to store into str.