Repository: hbase Updated Branches: refs/heads/master a03d09abd -> 4e821d491
HBASE-17851: WAL to HFile conversion phase MUST detect and handle missing WAL files Signed-off-by: tedyu <yuzhih...@gmail.com> Project: http://git-wip-us.apache.org/repos/asf/hbase/repo Commit: http://git-wip-us.apache.org/repos/asf/hbase/commit/4e821d49 Tree: http://git-wip-us.apache.org/repos/asf/hbase/tree/4e821d49 Diff: http://git-wip-us.apache.org/repos/asf/hbase/diff/4e821d49 Branch: refs/heads/master Commit: 4e821d491624dc1db8f4df1154b19c602b76cbfc Parents: a03d09a Author: Vladimir Rodionov <vrodio...@hortonworks.com> Authored: Thu Mar 8 14:22:28 2018 -0800 Committer: tedyu <yuzhih...@gmail.com> Committed: Thu Mar 8 15:28:27 2018 -0800 ---------------------------------------------------------------------- .../hbase/mapreduce/TestWALRecordReader.java | 81 +++++++++++++++++++- 1 file changed, 80 insertions(+), 1 deletion(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/hbase/blob/4e821d49/hbase-mapreduce/src/test/java/org/apache/hadoop/hbase/mapreduce/TestWALRecordReader.java ---------------------------------------------------------------------- diff --git a/hbase-mapreduce/src/test/java/org/apache/hadoop/hbase/mapreduce/TestWALRecordReader.java b/hbase-mapreduce/src/test/java/org/apache/hadoop/hbase/mapreduce/TestWALRecordReader.java index e486714..449c4b7 100644 --- a/hbase-mapreduce/src/test/java/org/apache/hadoop/hbase/mapreduce/TestWALRecordReader.java +++ b/hbase-mapreduce/src/test/java/org/apache/hadoop/hbase/mapreduce/TestWALRecordReader.java @@ -42,6 +42,7 @@ import org.apache.hadoop.hbase.testclassification.MapReduceTests; import org.apache.hadoop.hbase.testclassification.MediumTests; import org.apache.hadoop.hbase.util.Bytes; import org.apache.hadoop.hbase.util.FSUtils; +import org.apache.hadoop.hbase.wal.AbstractFSWALProvider; import org.apache.hadoop.hbase.wal.WAL; import org.apache.hadoop.hbase.wal.WALEdit; import org.apache.hadoop.hbase.wal.WALFactory; @@ -241,6 +242,45 @@ public class TestWALRecordReader { testSplit(splits.get(1)); } + /** + * Test WALRecordReader tolerance to moving WAL from active + * to archive directory + * @throws Exception exception + */ + @Test + public void testWALRecordReaderActiveArchiveTolerance() throws Exception { + final WALFactory walfactory = new WALFactory(conf, getName()); + WAL log = walfactory.getWAL(info); + byte [] value = Bytes.toBytes("value"); + WALEdit edit = new WALEdit(); + edit.add(new KeyValue(rowName, family, Bytes.toBytes("1"), + System.currentTimeMillis(), value)); + long txid = log.append(info, getWalKeyImpl(System.currentTimeMillis(), scopes), edit, true); + log.sync(txid); + + Thread.sleep(10); // make sure 2nd edit gets a later timestamp + + edit = new WALEdit(); + edit.add(new KeyValue(rowName, family, Bytes.toBytes("2"), + System.currentTimeMillis(), value)); + txid = log.append(info, getWalKeyImpl(System.currentTimeMillis(), scopes), edit, true); + log.sync(txid); + log.shutdown(); + + // should have 2 log entries now + WALInputFormat input = new WALInputFormat(); + Configuration jobConf = new Configuration(conf); + jobConf.set("mapreduce.input.fileinputformat.inputdir", logDir.toString()); + // make sure log is found + List<InputSplit> splits = input.getSplits(MapreduceTestingShim.createJobContext(jobConf)); + assertEquals(1, splits.size()); + WALInputFormat.WALSplit split = (WALInputFormat.WALSplit) splits.get(0); + LOG.debug("log="+logDir+" file="+ split.getLogFileName()); + + testSplitWithMovingWAL(splits.get(0), Bytes.toBytes("1"), Bytes.toBytes("2")); + + } + protected WALKeyImpl getWalKeyImpl(final long time, NavigableMap<byte[], Integer> scopes) { return new WALKeyImpl(info.getEncodedNameAsBytes(), tableName, time, mvcc, scopes); } @@ -270,4 +310,43 @@ public class TestWALRecordReader { assertFalse(reader.nextKeyValue()); reader.close(); } -} + + /** + * Create a new reader from the split, match the edits against the passed columns, + * moving WAL to archive in between readings + */ + private void testSplitWithMovingWAL(InputSplit split, byte[] col1, byte[] col2) throws Exception { + WALRecordReader<WALKey> reader = getReader(); + reader.initialize(split, MapReduceTestUtil.createDummyMapTaskAttemptContext(conf)); + + assertTrue(reader.nextKeyValue()); + Cell cell = reader.getCurrentValue().getCells().get(0); + if (!Bytes.equals(col1, 0, col1.length, cell.getQualifierArray(), cell.getQualifierOffset(), + cell.getQualifierLength())) { + assertTrue( + "expected [" + Bytes.toString(col1) + "], actual [" + Bytes.toString( + cell.getQualifierArray(), cell.getQualifierOffset(), cell.getQualifierLength()) + "]", + false); + } + // Move log file to archive directory + // While WAL record reader is open + WALInputFormat.WALSplit split_ = (WALInputFormat.WALSplit) split; + + Path logFile = new Path(split_.getLogFileName()); + Path archivedLog = AbstractFSWALProvider.getArchivedLogPath(logFile, conf); + boolean result = fs.rename(logFile, archivedLog); + assertTrue(result); + result = fs.exists(archivedLog); + assertTrue(result); + assertTrue(reader.nextKeyValue()); + cell = reader.getCurrentValue().getCells().get(0); + if (!Bytes.equals(col2, 0, col2.length, cell.getQualifierArray(), cell.getQualifierOffset(), + cell.getQualifierLength())) { + assertTrue( + "expected [" + Bytes.toString(col2) + "], actual [" + Bytes.toString( + cell.getQualifierArray(), cell.getQualifierOffset(), cell.getQualifierLength()) + "]", + false); + } + reader.close(); + } +} \ No newline at end of file