Author: clamb Date: Thu May 29 22:27:25 2014 New Revision: 1598435 URL: http://svn.apache.org/r1598435 Log: merge from trunk r1598430
Added: hadoop/common/branches/fs-encryption/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/test/resources/recordSpanningMultipleSplits.txt - copied unchanged from r1598430, hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/test/resources/recordSpanningMultipleSplits.txt hadoop/common/branches/fs-encryption/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/test/resources/recordSpanningMultipleSplits.txt.bz2 - copied unchanged from r1598430, hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/test/resources/recordSpanningMultipleSplits.txt.bz2 Modified: hadoop/common/branches/fs-encryption/hadoop-mapreduce-project/ (props changed) hadoop/common/branches/fs-encryption/hadoop-mapreduce-project/CHANGES.txt (contents, props changed) hadoop/common/branches/fs-encryption/hadoop-mapreduce-project/conf/ (props changed) hadoop/common/branches/fs-encryption/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/pom.xml hadoop/common/branches/fs-encryption/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapred/LineRecordReader.java hadoop/common/branches/fs-encryption/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapred/TaskLog.java hadoop/common/branches/fs-encryption/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/lib/input/LineRecordReader.java hadoop/common/branches/fs-encryption/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/resources/mapred-default.xml (props changed) hadoop/common/branches/fs-encryption/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/site/apt/DistributedCacheDeploy.apt.vm hadoop/common/branches/fs-encryption/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/site/apt/EncryptedShuffle.apt.vm hadoop/common/branches/fs-encryption/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/site/apt/MapReduce_Compatibility_Hadoop1_Hadoop2.apt.vm hadoop/common/branches/fs-encryption/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/site/apt/PluggableShuffleAndPluggableSort.apt.vm hadoop/common/branches/fs-encryption/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/test/java/org/apache/hadoop/mapred/TestLineRecordReader.java hadoop/common/branches/fs-encryption/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/test/java/org/apache/hadoop/mapreduce/lib/input/TestLineRecordReader.java Propchange: hadoop/common/branches/fs-encryption/hadoop-mapreduce-project/ ------------------------------------------------------------------------------ Merged /hadoop/common/trunk/hadoop-mapreduce-project:r1596816-1598430 Modified: hadoop/common/branches/fs-encryption/hadoop-mapreduce-project/CHANGES.txt URL: http://svn.apache.org/viewvc/hadoop/common/branches/fs-encryption/hadoop-mapreduce-project/CHANGES.txt?rev=1598435&r1=1598434&r2=1598435&view=diff ============================================================================== --- hadoop/common/branches/fs-encryption/hadoop-mapreduce-project/CHANGES.txt (original) +++ hadoop/common/branches/fs-encryption/hadoop-mapreduce-project/CHANGES.txt Thu May 29 22:27:25 2014 @@ -202,6 +202,8 @@ Release 2.5.0 - UNRELEASED MAPREDUCE-5809. Enhance distcp to support preserving HDFS ACLs. (cnauroth) + MAPREDUCE-5899. Support incremental data copy in DistCp. (jing9) + OPTIMIZATIONS BUG FIXES @@ -239,6 +241,12 @@ Release 2.5.0 - UNRELEASED MAPREDUCE-5309. 2.0.4 JobHistoryParser can't parse certain failed job history files generated by 2.0.3 history server (Rushabh S Shah via jlowe) + MAPREDUCE-5862. Line records longer than 2x split size aren't handled + correctly (bc Wong via jlowe) + + MAPREDUCE-5895. Close streams properly to avoid leakage in TaskLog. + (Kousuke Saruta via devaraj) + Release 2.4.1 - UNRELEASED INCOMPATIBLE CHANGES Propchange: hadoop/common/branches/fs-encryption/hadoop-mapreduce-project/CHANGES.txt ------------------------------------------------------------------------------ Merged /hadoop/common/trunk/hadoop-mapreduce-project/CHANGES.txt:r1596816-1598430 Propchange: hadoop/common/branches/fs-encryption/hadoop-mapreduce-project/conf/ ------------------------------------------------------------------------------ Merged /hadoop/common/trunk/hadoop-mapreduce-project/conf:r1596816-1598430 Modified: hadoop/common/branches/fs-encryption/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/pom.xml URL: http://svn.apache.org/viewvc/hadoop/common/branches/fs-encryption/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/pom.xml?rev=1598435&r1=1598434&r2=1598435&view=diff ============================================================================== --- hadoop/common/branches/fs-encryption/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/pom.xml (original) +++ hadoop/common/branches/fs-encryption/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/pom.xml Thu May 29 22:27:25 2014 @@ -85,6 +85,15 @@ </execution> </executions> </plugin> + <plugin> + <groupId>org.apache.rat</groupId> + <artifactId>apache-rat-plugin</artifactId> + <configuration> + <excludes> + <exclude>src/test/resources/recordSpanningMultipleSplits.txt</exclude> + </excludes> + </configuration> + </plugin> </plugins> </build> </project> Modified: hadoop/common/branches/fs-encryption/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapred/LineRecordReader.java URL: http://svn.apache.org/viewvc/hadoop/common/branches/fs-encryption/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapred/LineRecordReader.java?rev=1598435&r1=1598434&r2=1598435&view=diff ============================================================================== --- hadoop/common/branches/fs-encryption/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapred/LineRecordReader.java (original) +++ hadoop/common/branches/fs-encryption/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapred/LineRecordReader.java Thu May 29 22:27:25 2014 @@ -184,7 +184,7 @@ public class LineRecordReader implements private int maxBytesToConsume(long pos) { return isCompressedInput() ? Integer.MAX_VALUE - : (int) Math.min(Integer.MAX_VALUE, end - pos); + : (int) Math.max(Math.min(Integer.MAX_VALUE, end - pos), maxLineLength); } private long getFilePosition() throws IOException { @@ -206,8 +206,7 @@ public class LineRecordReader implements while (getFilePosition() <= end || in.needAdditionalRecordAfterSplit()) { key.set(pos); - int newSize = in.readLine(value, maxLineLength, - Math.max(maxBytesToConsume(pos), maxLineLength)); + int newSize = in.readLine(value, maxLineLength, maxBytesToConsume(pos)); if (newSize == 0) { return false; } Modified: hadoop/common/branches/fs-encryption/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapred/TaskLog.java URL: http://svn.apache.org/viewvc/hadoop/common/branches/fs-encryption/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapred/TaskLog.java?rev=1598435&r1=1598434&r2=1598435&view=diff ============================================================================== --- hadoop/common/branches/fs-encryption/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapred/TaskLog.java (original) +++ hadoop/common/branches/fs-encryption/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapred/TaskLog.java Thu May 29 22:27:25 2014 @@ -199,16 +199,18 @@ public class TaskLog { // file first and then rename. File tmpIndexFile = getTmpIndexFile(currentTaskid, isCleanup); - BufferedOutputStream bos = - new BufferedOutputStream( - SecureIOUtils.createForWrite(tmpIndexFile, 0644)); - DataOutputStream dos = new DataOutputStream(bos); - //the format of the index file is - //LOG_DIR: <the dir where the task logs are really stored> - //STDOUT: <start-offset in the stdout file> <length> - //STDERR: <start-offset in the stderr file> <length> - //SYSLOG: <start-offset in the syslog file> <length> + BufferedOutputStream bos = null; + DataOutputStream dos = null; try{ + bos = new BufferedOutputStream( + SecureIOUtils.createForWrite(tmpIndexFile, 0644)); + dos = new DataOutputStream(bos); + //the format of the index file is + //LOG_DIR: <the dir where the task logs are really stored> + //STDOUT: <start-offset in the stdout file> <length> + //STDERR: <start-offset in the stderr file> <length> + //SYSLOG: <start-offset in the syslog file> <length> + dos.writeBytes(LogFileDetail.LOCATION + logLocation + "\n" + LogName.STDOUT.toString() + ":"); dos.writeBytes(Long.toString(prevOutLength) + " "); @@ -225,8 +227,10 @@ public class TaskLog { + "\n"); dos.close(); dos = null; + bos.close(); + bos = null; } finally { - IOUtils.cleanup(LOG, dos); + IOUtils.cleanup(LOG, dos, bos); } File indexFile = getIndexFile(currentTaskid, isCleanup); Modified: hadoop/common/branches/fs-encryption/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/lib/input/LineRecordReader.java URL: http://svn.apache.org/viewvc/hadoop/common/branches/fs-encryption/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/lib/input/LineRecordReader.java?rev=1598435&r1=1598434&r2=1598435&view=diff ============================================================================== --- hadoop/common/branches/fs-encryption/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/lib/input/LineRecordReader.java (original) +++ hadoop/common/branches/fs-encryption/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/lib/input/LineRecordReader.java Thu May 29 22:27:25 2014 @@ -121,7 +121,7 @@ public class LineRecordReader extends Re private int maxBytesToConsume(long pos) { return isCompressedInput ? Integer.MAX_VALUE - : (int) Math.min(Integer.MAX_VALUE, end - pos); + : (int) Math.max(Math.min(Integer.MAX_VALUE, end - pos), maxLineLength); } private long getFilePosition() throws IOException { @@ -146,8 +146,7 @@ public class LineRecordReader extends Re // We always read one extra line, which lies outside the upper // split limit i.e. (end - 1) while (getFilePosition() <= end || in.needAdditionalRecordAfterSplit()) { - newSize = in.readLine(value, maxLineLength, - Math.max(maxBytesToConsume(pos), maxLineLength)); + newSize = in.readLine(value, maxLineLength, maxBytesToConsume(pos)); pos += newSize; if (newSize < maxLineLength) { break; Propchange: hadoop/common/branches/fs-encryption/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/resources/mapred-default.xml ------------------------------------------------------------------------------ Merged /hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/resources/mapred-default.xml:r1596816-1598430 Modified: hadoop/common/branches/fs-encryption/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/site/apt/DistributedCacheDeploy.apt.vm URL: http://svn.apache.org/viewvc/hadoop/common/branches/fs-encryption/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/site/apt/DistributedCacheDeploy.apt.vm?rev=1598435&r1=1598434&r2=1598435&view=diff ============================================================================== --- hadoop/common/branches/fs-encryption/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/site/apt/DistributedCacheDeploy.apt.vm (original) +++ hadoop/common/branches/fs-encryption/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/site/apt/DistributedCacheDeploy.apt.vm Thu May 29 22:27:25 2014 @@ -18,8 +18,6 @@ Hadoop MapReduce Next Generation - Distributed Cache Deploy - \[ {{{./index.html}Go Back}} \] - * Introduction The MapReduce application framework has rudimentary support for deploying a Modified: hadoop/common/branches/fs-encryption/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/site/apt/EncryptedShuffle.apt.vm URL: http://svn.apache.org/viewvc/hadoop/common/branches/fs-encryption/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/site/apt/EncryptedShuffle.apt.vm?rev=1598435&r1=1598434&r2=1598435&view=diff ============================================================================== --- hadoop/common/branches/fs-encryption/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/site/apt/EncryptedShuffle.apt.vm (original) +++ hadoop/common/branches/fs-encryption/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/site/apt/EncryptedShuffle.apt.vm Thu May 29 22:27:25 2014 @@ -18,8 +18,6 @@ Hadoop MapReduce Next Generation - Encrypted Shuffle - \[ {{{./index.html}Go Back}} \] - * {Introduction} The Encrypted Shuffle capability allows encryption of the MapReduce shuffle Modified: hadoop/common/branches/fs-encryption/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/site/apt/MapReduce_Compatibility_Hadoop1_Hadoop2.apt.vm URL: http://svn.apache.org/viewvc/hadoop/common/branches/fs-encryption/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/site/apt/MapReduce_Compatibility_Hadoop1_Hadoop2.apt.vm?rev=1598435&r1=1598434&r2=1598435&view=diff ============================================================================== --- hadoop/common/branches/fs-encryption/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/site/apt/MapReduce_Compatibility_Hadoop1_Hadoop2.apt.vm (original) +++ hadoop/common/branches/fs-encryption/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/site/apt/MapReduce_Compatibility_Hadoop1_Hadoop2.apt.vm Thu May 29 22:27:25 2014 @@ -18,8 +18,6 @@ Apache Hadoop MapReduce - Migrating from Apache Hadoop 1.x to Apache Hadoop 2.x - \[ {{{../../hadoop-yarn/hadoop-yarn-site/index.html}Go Back}} \] - * {Introduction} This document provides information for users to migrate their Apache Hadoop Modified: hadoop/common/branches/fs-encryption/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/site/apt/PluggableShuffleAndPluggableSort.apt.vm URL: http://svn.apache.org/viewvc/hadoop/common/branches/fs-encryption/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/site/apt/PluggableShuffleAndPluggableSort.apt.vm?rev=1598435&r1=1598434&r2=1598435&view=diff ============================================================================== --- hadoop/common/branches/fs-encryption/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/site/apt/PluggableShuffleAndPluggableSort.apt.vm (original) +++ hadoop/common/branches/fs-encryption/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/site/apt/PluggableShuffleAndPluggableSort.apt.vm Thu May 29 22:27:25 2014 @@ -18,8 +18,6 @@ Hadoop MapReduce Next Generation - Pluggable Shuffle and Pluggable Sort - \[ {{{./index.html}Go Back}} \] - * Introduction The pluggable shuffle and pluggable sort capabilities allow replacing the Modified: hadoop/common/branches/fs-encryption/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/test/java/org/apache/hadoop/mapred/TestLineRecordReader.java URL: http://svn.apache.org/viewvc/hadoop/common/branches/fs-encryption/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/test/java/org/apache/hadoop/mapred/TestLineRecordReader.java?rev=1598435&r1=1598434&r2=1598435&view=diff ============================================================================== --- hadoop/common/branches/fs-encryption/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/test/java/org/apache/hadoop/mapred/TestLineRecordReader.java (original) +++ hadoop/common/branches/fs-encryption/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/test/java/org/apache/hadoop/mapred/TestLineRecordReader.java Thu May 29 22:27:25 2014 @@ -23,9 +23,12 @@ import static org.junit.Assert.assertNot import static org.junit.Assert.assertTrue; import java.io.File; +import java.io.FileInputStream; import java.io.IOException; import java.net.URL; +import java.util.ArrayList; +import org.apache.commons.compress.compressors.bzip2.BZip2CompressorInputStream; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.LongWritable; @@ -97,4 +100,92 @@ public class TestLineRecordReader { // character is a linefeed testSplitRecords("blockEndingInCRThenLF.txt.bz2", 136498); } + + // Use the LineRecordReader to read records from the file + public ArrayList<String> readRecords(URL testFileUrl, int splitSize) + throws IOException { + + // Set up context + File testFile = new File(testFileUrl.getFile()); + long testFileSize = testFile.length(); + Path testFilePath = new Path(testFile.getAbsolutePath()); + Configuration conf = new Configuration(); + conf.setInt("io.file.buffer.size", 1); + + // Gather the records returned by the record reader + ArrayList<String> records = new ArrayList<String>(); + + long offset = 0; + LongWritable key = new LongWritable(); + Text value = new Text(); + while (offset < testFileSize) { + FileSplit split = + new FileSplit(testFilePath, offset, splitSize, (String[]) null); + LineRecordReader reader = new LineRecordReader(conf, split); + + while (reader.next(key, value)) { + records.add(value.toString()); + } + offset += splitSize; + } + return records; + } + + // Gather the records by just splitting on new lines + public String[] readRecordsDirectly(URL testFileUrl, boolean bzip) + throws IOException { + int MAX_DATA_SIZE = 1024 * 1024; + byte[] data = new byte[MAX_DATA_SIZE]; + FileInputStream fis = new FileInputStream(testFileUrl.getFile()); + int count; + if (bzip) { + BZip2CompressorInputStream bzIn = new BZip2CompressorInputStream(fis); + count = bzIn.read(data); + bzIn.close(); + } else { + count = fis.read(data); + } + fis.close(); + assertTrue("Test file data too big for buffer", count < data.length); + return new String(data, 0, count, "UTF-8").split("\n"); + } + + public void checkRecordSpanningMultipleSplits(String testFile, + int splitSize, + boolean bzip) + throws IOException { + URL testFileUrl = getClass().getClassLoader().getResource(testFile); + ArrayList<String> records = readRecords(testFileUrl, splitSize); + String[] actuals = readRecordsDirectly(testFileUrl, bzip); + + assertEquals("Wrong number of records", actuals.length, records.size()); + + boolean hasLargeRecord = false; + for (int i = 0; i < actuals.length; ++i) { + assertEquals(actuals[i], records.get(i)); + if (actuals[i].length() > 2 * splitSize) { + hasLargeRecord = true; + } + } + + assertTrue("Invalid test data. Doesn't have a large enough record", + hasLargeRecord); + } + + @Test + public void testRecordSpanningMultipleSplits() + throws IOException { + checkRecordSpanningMultipleSplits("recordSpanningMultipleSplits.txt", + 10, false); + } + + @Test + public void testRecordSpanningMultipleSplitsCompressed() + throws IOException { + // The file is generated with bz2 block size of 100k. The split size + // needs to be larger than that for the CompressedSplitLineReader to + // work. + checkRecordSpanningMultipleSplits("recordSpanningMultipleSplits.txt.bz2", + 200 * 1000, true); + } } Modified: hadoop/common/branches/fs-encryption/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/test/java/org/apache/hadoop/mapreduce/lib/input/TestLineRecordReader.java URL: http://svn.apache.org/viewvc/hadoop/common/branches/fs-encryption/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/test/java/org/apache/hadoop/mapreduce/lib/input/TestLineRecordReader.java?rev=1598435&r1=1598434&r2=1598435&view=diff ============================================================================== --- hadoop/common/branches/fs-encryption/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/test/java/org/apache/hadoop/mapreduce/lib/input/TestLineRecordReader.java (original) +++ hadoop/common/branches/fs-encryption/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/test/java/org/apache/hadoop/mapreduce/lib/input/TestLineRecordReader.java Thu May 29 22:27:25 2014 @@ -23,9 +23,12 @@ import static org.junit.Assert.assertNot import static org.junit.Assert.assertTrue; import java.io.File; +import java.io.FileInputStream; import java.io.IOException; import java.net.URL; +import java.util.ArrayList; +import org.apache.commons.compress.compressors.bzip2.BZip2CompressorInputStream; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.Path; import org.apache.hadoop.mapreduce.TaskAttemptContext; @@ -101,4 +104,93 @@ public class TestLineRecordReader { // character is a linefeed testSplitRecords("blockEndingInCRThenLF.txt.bz2", 136498); } + + // Use the LineRecordReader to read records from the file + public ArrayList<String> readRecords(URL testFileUrl, int splitSize) + throws IOException { + + // Set up context + File testFile = new File(testFileUrl.getFile()); + long testFileSize = testFile.length(); + Path testFilePath = new Path(testFile.getAbsolutePath()); + Configuration conf = new Configuration(); + conf.setInt("io.file.buffer.size", 1); + TaskAttemptContext context = new TaskAttemptContextImpl(conf, new TaskAttemptID()); + + // Gather the records returned by the record reader + ArrayList<String> records = new ArrayList<String>(); + + long offset = 0; + while (offset < testFileSize) { + FileSplit split = new FileSplit(testFilePath, offset, splitSize, null); + LineRecordReader reader = new LineRecordReader(); + reader.initialize(split, context); + + while (reader.nextKeyValue()) { + records.add(reader.getCurrentValue().toString()); + } + offset += splitSize; + } + return records; + } + + // Gather the records by just splitting on new lines + public String[] readRecordsDirectly(URL testFileUrl, boolean bzip) + throws IOException { + int MAX_DATA_SIZE = 1024 * 1024; + byte[] data = new byte[MAX_DATA_SIZE]; + FileInputStream fis = new FileInputStream(testFileUrl.getFile()); + int count; + if (bzip) { + BZip2CompressorInputStream bzIn = new BZip2CompressorInputStream(fis); + count = bzIn.read(data); + bzIn.close(); + } else { + count = fis.read(data); + } + fis.close(); + assertTrue("Test file data too big for buffer", count < data.length); + return new String(data, 0, count, "UTF-8").split("\n"); + } + + public void checkRecordSpanningMultipleSplits(String testFile, + int splitSize, + boolean bzip) + throws IOException { + URL testFileUrl = getClass().getClassLoader().getResource(testFile); + ArrayList<String> records = readRecords(testFileUrl, splitSize); + String[] actuals = readRecordsDirectly(testFileUrl, bzip); + + assertEquals("Wrong number of records", actuals.length, records.size()); + + boolean hasLargeRecord = false; + for (int i = 0; i < actuals.length; ++i) { + assertEquals(actuals[i], records.get(i)); + if (actuals[i].length() > 2 * splitSize) { + hasLargeRecord = true; + } + } + + assertTrue("Invalid test data. Doesn't have a large enough record", + hasLargeRecord); + } + + @Test + public void testRecordSpanningMultipleSplits() + throws IOException { + checkRecordSpanningMultipleSplits("recordSpanningMultipleSplits.txt", + 10, + false); + } + + @Test + public void testRecordSpanningMultipleSplitsCompressed() + throws IOException { + // The file is generated with bz2 block size of 100k. The split size + // needs to be larger than that for the CompressedSplitLineReader to + // work. + checkRecordSpanningMultipleSplits("recordSpanningMultipleSplits.txt.bz2", + 200 * 1000, + true); + } }