MapFile.fix creates a wrong index file in case of block-compressed data file. Contributed by Grigori Rybkine
(cherry picked from commit 91db424c4360d7556660e8c57ac9a266e6688e01) (cherry picked from commit 7f354877889b343878a8a09792d5cec8d2846a50) (cherry picked from commit 9bd439e2c535b95ff0d2b5767b05a7ef43479298) Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/e062e2b0 Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/e062e2b0 Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/e062e2b0 Branch: refs/heads/branch-2.9 Commit: e062e2b08c56adb1fa7965a7de543810df6f5a91 Parents: 98499bb Author: Chris Douglas <cdoug...@apache.org> Authored: Fri Jan 26 09:06:48 2018 -0800 Committer: Chris Douglas <cdoug...@apache.org> Committed: Fri Jan 26 09:16:08 2018 -0800 ---------------------------------------------------------------------- .../main/java/org/apache/hadoop/io/MapFile.java | 35 ++++++++++-- .../java/org/apache/hadoop/io/TestMapFile.java | 59 +++++++++++++++++++- 2 files changed, 88 insertions(+), 6 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/hadoop/blob/e062e2b0/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/MapFile.java ---------------------------------------------------------------------- diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/MapFile.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/MapFile.java index f9e0145..8373e01 100644 --- a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/MapFile.java +++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/MapFile.java @@ -811,15 +811,40 @@ public class MapFile { (LongWritable.class)); } try { - long pos = 0L; + /** What's the position (in bytes) we wrote when we got the last index */ + long lastIndexPos = -1; + /** + * What was size when we last wrote an index. Set to MIN_VALUE to ensure + * that we have an index at position zero - midKey will throw an exception + * if this is not the case + */ + long lastIndexKeyCount = Long.MIN_VALUE; + long pos = dataReader.getPosition(); LongWritable position = new LongWritable(); + long nextBlock = pos; + boolean blockCompressed = dataReader.isBlockCompressed(); while(dataReader.next(key, value)) { - cnt++; - if (cnt % indexInterval == 0) { + if (blockCompressed) { + long curPos = dataReader.getPosition(); + if (curPos > nextBlock) { + pos = nextBlock; // current block position + nextBlock = curPos; + } + } + // Follow the same logic as in + // {@link MapFile.Writer#append(WritableComparable, Writable)} + if (cnt >= lastIndexKeyCount + indexInterval && pos > lastIndexPos) { position.set(pos); - if (!dryrun) indexWriter.append(key, position); + if (!dryrun) { + indexWriter.append(key, position); + } + lastIndexPos = pos; + lastIndexKeyCount = cnt; + } + if (!blockCompressed) { + pos = dataReader.getPosition(); // next record position } - pos = dataReader.getPosition(); + cnt++; } } catch(Throwable t) { // truncated data file. swallow it. http://git-wip-us.apache.org/repos/asf/hadoop/blob/e062e2b0/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/io/TestMapFile.java ---------------------------------------------------------------------- diff --git a/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/io/TestMapFile.java b/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/io/TestMapFile.java index ff8df7c..7ec4227 100644 --- a/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/io/TestMapFile.java +++ b/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/io/TestMapFile.java @@ -485,6 +485,63 @@ public class TestMapFile { IOUtils.cleanup(null, writer); } } + + /** + * test {@link MapFile#fix(FileSystem, Path, Class<? extends Writable>, + * Class<? extends Writable>, boolean, Configuration)} + * method in case of BLOCK compression + */ + @Test + public void testFixBlockCompress() throws Exception { + final String indexLessMapFile = "testFixBlockCompress.mapfile"; + final int compressBlocksize = 100; + final int indexInterval = 4; + final int noBlocks = 4; + final String value = "value-"; + final int size = noBlocks * compressBlocksize / (4 + value.length()); + + conf.setInt("io.seqfile.compress.blocksize", compressBlocksize); + MapFile.Writer.setIndexInterval(conf, indexInterval); + FileSystem fs = FileSystem.getLocal(conf); + Path dir = new Path(TEST_DIR, indexLessMapFile); + MapFile.Writer writer = null; + MapFile.Reader reader = null; + try { + writer = + new MapFile.Writer(conf, dir, + MapFile.Writer.keyClass(IntWritable.class), + MapFile.Writer.valueClass(Text.class), + MapFile.Writer.compression(CompressionType.BLOCK)); + for (int i = 0; i < size; i++) { + writer.append(new IntWritable(i), new Text(value + i)); + } + writer.close(); + Path index = new Path(dir, MapFile.INDEX_FILE_NAME); + fs.rename(index, index.suffix(".orig")); + + assertEquals("No of valid MapFile entries wrong", size, + MapFile.fix(fs, dir, IntWritable.class, Text.class, + false, conf)); + reader = new MapFile.Reader(dir, conf); + IntWritable key; + Text val = new Text(); + int notFound = 0; + for (int i = 0; i < size; i++) { + key = new IntWritable(i); + if (null == reader.get(key, val)) { + notFound++; + } + } + assertEquals("With MapFile.fix-ed index, could not get entries # ", + 0, notFound); + } finally { + IOUtils.cleanupWithLogger(null, writer, reader); + if (fs.exists(dir)) { + fs.delete(dir, true); + } + } + } + /** * test all available constructor for {@code MapFile.Writer} */ @@ -619,7 +676,7 @@ public class TestMapFile { } catch (Exception ex) { fail("testMainMethodMapFile error !!!"); } finally { - IOUtils.cleanup(null, writer); + IOUtils.cleanupWithLogger(null, writer); } } --------------------------------------------------------------------- To unsubscribe, e-mail: common-commits-unsubscr...@hadoop.apache.org For additional commands, e-mail: common-commits-h...@hadoop.apache.org