This is an automated email from the ASF dual-hosted git repository.
wchevreuil pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hbase.git
The following commit(s) were added to refs/heads/master by this push:
new e26fd95b6ff HBASE-29183 Fix flakeyness on TestVerifyBucketCacheFile
(#6787)
e26fd95b6ff is described below
commit e26fd95b6ff0131d861e9015d6a7876aabcde1d3
Author: Wellington Ramos Chevreuil <[email protected]>
AuthorDate: Sun Mar 16 13:25:12 2025 +0000
HBASE-29183 Fix flakeyness on TestVerifyBucketCacheFile (#6787)
Signed-off-by: Duo Zhang <[email protected]>
---
.../io/hfile/bucket/TestVerifyBucketCacheFile.java | 491 ++++++++++++---------
1 file changed, 278 insertions(+), 213 deletions(-)
diff --git
a/hbase-server/src/test/java/org/apache/hadoop/hbase/io/hfile/bucket/TestVerifyBucketCacheFile.java
b/hbase-server/src/test/java/org/apache/hadoop/hbase/io/hfile/bucket/TestVerifyBucketCacheFile.java
index a618965323d..7f2ddd8c340 100644
---
a/hbase-server/src/test/java/org/apache/hadoop/hbase/io/hfile/bucket/TestVerifyBucketCacheFile.java
+++
b/hbase-server/src/test/java/org/apache/hadoop/hbase/io/hfile/bucket/TestVerifyBucketCacheFile.java
@@ -98,66 +98,81 @@ public class TestVerifyBucketCacheFile {
Path testDir = TEST_UTIL.getDataTestDir();
TEST_UTIL.getTestFileSystem().mkdirs(testDir);
- BucketCache bucketCache =
- new BucketCache("file:" + testDir + "/bucket.cache", capacitySize,
constructedBlockSize,
- constructedBlockSizes, writeThreads, writerQLen, testDir +
"/bucket.persistence");
- assertTrue(bucketCache.waitForCacheInitialization(10000));
- long usedSize = bucketCache.getAllocator().getUsedSize();
- assertEquals(0, usedSize);
- CacheTestUtils.HFileBlockPair[] blocks =
- CacheTestUtils.generateHFileBlocks(constructedBlockSize, 1);
- // Add blocks
- for (CacheTestUtils.HFileBlockPair block : blocks) {
- cacheAndWaitUntilFlushedToBucket(bucketCache, block.getBlockName(),
block.getBlock());
- }
- usedSize = bucketCache.getAllocator().getUsedSize();
- assertNotEquals(0, usedSize);
- // 1.persist cache to file
- bucketCache.shutdown();
- // restore cache from file
- bucketCache =
- new BucketCache("file:" + testDir + "/bucket.cache", capacitySize,
constructedBlockSize,
- constructedBlockSizes, writeThreads, writerQLen, testDir +
"/bucket.persistence");
- assertTrue(bucketCache.waitForCacheInitialization(10000));
- assertEquals(usedSize, bucketCache.getAllocator().getUsedSize());
- // persist cache to file
- bucketCache.shutdown();
-
- // 2.delete bucket cache file
- final java.nio.file.Path cacheFile =
- FileSystems.getDefault().getPath(testDir.toString(), "bucket.cache");
- assertTrue(Files.deleteIfExists(cacheFile));
- // can't restore cache from file
- final BucketCache recoveredBucketCache =
- new BucketCache("file:" + testDir + "/bucket.cache", capacitySize,
constructedBlockSize,
- constructedBlockSizes, writeThreads, writerQLen, testDir +
"/bucket.persistence");
- assertTrue(recoveredBucketCache.waitForCacheInitialization(10000));
- Waiter.waitFor(HBaseConfiguration.create(), 1000,
- () -> recoveredBucketCache.getBackingMapValidated().get());
- assertEquals(0, recoveredBucketCache.getAllocator().getUsedSize());
- assertEquals(0, recoveredBucketCache.backingMap.size());
- // Add blocks
- for (CacheTestUtils.HFileBlockPair block : blocks) {
- cacheAndWaitUntilFlushedToBucket(recoveredBucketCache,
block.getBlockName(),
- block.getBlock());
- }
- usedSize = recoveredBucketCache.getAllocator().getUsedSize();
- assertNotEquals(0, usedSize);
- // persist cache to file
- recoveredBucketCache.shutdown();
-
- // 3.delete backingMap persistence file
- final java.nio.file.Path mapFile =
- FileSystems.getDefault().getPath(testDir.toString(),
"bucket.persistence");
- assertTrue(Files.deleteIfExists(mapFile));
- // can't restore cache from file
- bucketCache =
- new BucketCache("file:" + testDir + "/bucket.cache", capacitySize,
constructedBlockSize,
- constructedBlockSizes, writeThreads, writerQLen, testDir +
"/bucket.persistence");
- assertTrue(bucketCache.waitForCacheInitialization(10000));
- assertEquals(0, bucketCache.getAllocator().getUsedSize());
- assertEquals(0, bucketCache.backingMap.size());
+ Configuration conf = HBaseConfiguration.create();
+ // Disables the persister thread by setting its interval to MAX_VALUE
+ conf.setLong(BUCKETCACHE_PERSIST_INTERVAL_KEY, Long.MAX_VALUE);
+ BucketCache bucketCache = null;
+ BucketCache recoveredBucketCache = null;
+ try {
+ bucketCache = new BucketCache("file:" + testDir + "/bucket.cache",
capacitySize,
+ constructedBlockSize, constructedBlockSizes, writeThreads, writerQLen,
+ testDir + "/bucket.persistence", DEFAULT_ERROR_TOLERATION_DURATION,
conf);
+ assertTrue(bucketCache.waitForCacheInitialization(10000));
+ long usedSize = bucketCache.getAllocator().getUsedSize();
+ assertEquals(0, usedSize);
+ CacheTestUtils.HFileBlockPair[] blocks =
+ CacheTestUtils.generateHFileBlocks(constructedBlockSize, 1);
+ // Add blocks
+ for (CacheTestUtils.HFileBlockPair block : blocks) {
+ cacheAndWaitUntilFlushedToBucket(bucketCache, block.getBlockName(),
block.getBlock());
+ }
+ usedSize = bucketCache.getAllocator().getUsedSize();
+ assertNotEquals(0, usedSize);
+ // 1.persist cache to file
+ bucketCache.shutdown();
+ // restore cache from file
+ bucketCache = new BucketCache("file:" + testDir + "/bucket.cache",
capacitySize,
+ constructedBlockSize, constructedBlockSizes, writeThreads, writerQLen,
+ testDir + "/bucket.persistence", DEFAULT_ERROR_TOLERATION_DURATION,
conf);
+ assertTrue(bucketCache.waitForCacheInitialization(10000));
+ waitPersistentCacheValidation(conf, bucketCache);
+ assertEquals(usedSize, bucketCache.getAllocator().getUsedSize());
+ // persist cache to file
+ bucketCache.shutdown();
+
+ // 2.delete bucket cache file
+ final java.nio.file.Path cacheFile =
+ FileSystems.getDefault().getPath(testDir.toString(), "bucket.cache");
+ assertTrue(Files.deleteIfExists(cacheFile));
+ // can't restore cache from file
+ recoveredBucketCache = new BucketCache("file:" + testDir +
"/bucket.cache", capacitySize,
+ constructedBlockSize, constructedBlockSizes, writeThreads, writerQLen,
+ testDir + "/bucket.persistence", DEFAULT_ERROR_TOLERATION_DURATION,
conf);
+ assertTrue(recoveredBucketCache.waitForCacheInitialization(10000));
+ waitPersistentCacheValidation(conf, recoveredBucketCache);
+ assertEquals(0, recoveredBucketCache.getAllocator().getUsedSize());
+ assertEquals(0, recoveredBucketCache.backingMap.size());
+ // Add blocks
+ for (CacheTestUtils.HFileBlockPair block : blocks) {
+ cacheAndWaitUntilFlushedToBucket(recoveredBucketCache,
block.getBlockName(),
+ block.getBlock());
+ }
+ usedSize = recoveredBucketCache.getAllocator().getUsedSize();
+ assertNotEquals(0, usedSize);
+ // persist cache to file
+ recoveredBucketCache.shutdown();
+
+ // 3.delete backingMap persistence file
+ final java.nio.file.Path mapFile =
+ FileSystems.getDefault().getPath(testDir.toString(),
"bucket.persistence");
+ assertTrue(Files.deleteIfExists(mapFile));
+ // can't restore cache from file
+ bucketCache = new BucketCache("file:" + testDir + "/bucket.cache",
capacitySize,
+ constructedBlockSize, constructedBlockSizes, writeThreads, writerQLen,
+ testDir + "/bucket.persistence", DEFAULT_ERROR_TOLERATION_DURATION,
conf);
+ assertTrue(bucketCache.waitForCacheInitialization(10000));
+ waitPersistentCacheValidation(conf, bucketCache);
+ assertEquals(0, bucketCache.getAllocator().getUsedSize());
+ assertEquals(0, bucketCache.backingMap.size());
+ } finally {
+ if (bucketCache != null) {
+ bucketCache.shutdown();
+ }
+ if (recoveredBucketCache != null) {
+ recoveredBucketCache.shutdown();
+ }
+ }
TEST_UTIL.cleanupTestDir();
}
@@ -169,34 +184,41 @@ public class TestVerifyBucketCacheFile {
Configuration conf = TEST_UTIL.getConfiguration();
conf.setLong(CacheConfig.BUCKETCACHE_PERSIST_INTERVAL_KEY, 300);
String mapFileName = testDir + "/bucket.persistence" +
EnvironmentEdgeManager.currentTime();
- BucketCache bucketCache =
- new BucketCache("file:" + testDir + "/bucket.cache", capacitySize,
constructedBlockSize,
- constructedBlockSizes, writeThreads, writerQLen, mapFileName, 60 *
1000, conf);
- assertTrue(bucketCache.waitForCacheInitialization(10000));
-
- long usedSize = bucketCache.getAllocator().getUsedSize();
- assertEquals(0, usedSize);
- CacheTestUtils.HFileBlockPair[] blocks =
- CacheTestUtils.generateHFileBlocks(constructedBlockSize, 1);
- // Add blocks
- for (CacheTestUtils.HFileBlockPair block : blocks) {
- cacheAndWaitUntilFlushedToBucket(bucketCache, block.getBlockName(),
block.getBlock());
+ BucketCache bucketCache = null;
+ try {
+ bucketCache = new BucketCache("file:" + testDir + "/bucket.cache",
capacitySize,
+ constructedBlockSize, constructedBlockSizes, writeThreads, writerQLen,
mapFileName,
+ DEFAULT_ERROR_TOLERATION_DURATION, conf);
+ assertTrue(bucketCache.waitForCacheInitialization(10000));
+ long usedSize = bucketCache.getAllocator().getUsedSize();
+ assertEquals(0, usedSize);
+ CacheTestUtils.HFileBlockPair[] blocks =
+ CacheTestUtils.generateHFileBlocks(constructedBlockSize, 1);
+ // Add blocks
+ for (CacheTestUtils.HFileBlockPair block : blocks) {
+ cacheAndWaitUntilFlushedToBucket(bucketCache, block.getBlockName(),
block.getBlock());
+ }
+ usedSize = bucketCache.getAllocator().getUsedSize();
+ assertNotEquals(0, usedSize);
+ // Shutdown BucketCache
+ bucketCache.shutdown();
+ // Delete the persistence file
+ File mapFile = new File(mapFileName);
+ assertTrue(mapFile.delete());
+ Thread.sleep(350);
+ // Create BucketCache
+ bucketCache = new BucketCache("file:" + testDir + "/bucket.cache",
capacitySize,
+ constructedBlockSize, constructedBlockSizes, writeThreads, writerQLen,
mapFileName,
+ DEFAULT_ERROR_TOLERATION_DURATION, conf);
+ assertTrue(bucketCache.waitForCacheInitialization(10000));
+ waitPersistentCacheValidation(conf, bucketCache);
+ assertEquals(0, bucketCache.getAllocator().getUsedSize());
+ assertEquals(0, bucketCache.backingMap.size());
+ } finally {
+ if (bucketCache != null) {
+ bucketCache.shutdown();
+ }
}
- usedSize = bucketCache.getAllocator().getUsedSize();
- assertNotEquals(0, usedSize);
- // Shutdown BucketCache
- bucketCache.shutdown();
- // Delete the persistence file
- File mapFile = new File(mapFileName);
- assertTrue(mapFile.delete());
- Thread.sleep(350);
- // Create BucketCache
- bucketCache =
- new BucketCache("file:" + testDir + "/bucket.cache", capacitySize,
constructedBlockSize,
- constructedBlockSizes, writeThreads, writerQLen, mapFileName, 60 *
1000, conf);
- assertTrue(bucketCache.waitForCacheInitialization(10000));
- assertEquals(0, bucketCache.getAllocator().getUsedSize());
- assertEquals(0, bucketCache.backingMap.size());
}
/**
@@ -215,38 +237,45 @@ public class TestVerifyBucketCacheFile {
Configuration conf = HBaseConfiguration.create();
// Disables the persister thread by setting its interval to MAX_VALUE
conf.setLong(BUCKETCACHE_PERSIST_INTERVAL_KEY, Long.MAX_VALUE);
- BucketCache bucketCache = new BucketCache("file:" + testDir +
"/bucket.cache", capacitySize,
- constructedBlockSize, constructedBlockSizes, writeThreads, writerQLen,
- testDir + "/bucket.persistence", DEFAULT_ERROR_TOLERATION_DURATION,
conf);
- assertTrue(bucketCache.waitForCacheInitialization(10000));
- long usedSize = bucketCache.getAllocator().getUsedSize();
- assertEquals(0, usedSize);
-
- CacheTestUtils.HFileBlockPair[] blocks =
- CacheTestUtils.generateHFileBlocks(constructedBlockSize, 1);
- // Add blocks
- for (CacheTestUtils.HFileBlockPair block : blocks) {
- cacheAndWaitUntilFlushedToBucket(bucketCache, block.getBlockName(),
block.getBlock());
+ BucketCache bucketCache = null;
+ try {
+ bucketCache = new BucketCache("file:" + testDir + "/bucket.cache",
capacitySize,
+ constructedBlockSize, constructedBlockSizes, writeThreads, writerQLen,
+ testDir + "/bucket.persistence", DEFAULT_ERROR_TOLERATION_DURATION,
conf);
+ assertTrue(bucketCache.waitForCacheInitialization(10000));
+ long usedSize = bucketCache.getAllocator().getUsedSize();
+ assertEquals(0, usedSize);
+
+ CacheTestUtils.HFileBlockPair[] blocks =
+ CacheTestUtils.generateHFileBlocks(constructedBlockSize, 1);
+ // Add blocks
+ for (CacheTestUtils.HFileBlockPair block : blocks) {
+ cacheAndWaitUntilFlushedToBucket(bucketCache, block.getBlockName(),
block.getBlock());
+ }
+ usedSize = bucketCache.getAllocator().getUsedSize();
+ assertNotEquals(0, usedSize);
+ // persist cache to file
+ bucketCache.shutdown();
+
+ // modified bucket cache file
+ String file = testDir + "/bucket.cache";
+ try (BufferedWriter out =
+ new BufferedWriter(new OutputStreamWriter(new FileOutputStream(file,
false)))) {
+ out.write("test bucket cache");
+ }
+ // can't restore cache from file
+ bucketCache = new BucketCache("file:" + testDir + "/bucket.cache",
capacitySize,
+ constructedBlockSize, constructedBlockSizes, writeThreads, writerQLen,
+ testDir + "/bucket.persistence", DEFAULT_ERROR_TOLERATION_DURATION,
conf);
+ assertTrue(bucketCache.waitForCacheInitialization(10000));
+ waitPersistentCacheValidation(conf, bucketCache);
+ assertEquals(0, bucketCache.getAllocator().getUsedSize());
+ assertEquals(0, bucketCache.backingMap.size());
+ } finally {
+ if (bucketCache != null) {
+ bucketCache.shutdown();
+ }
}
- usedSize = bucketCache.getAllocator().getUsedSize();
- assertNotEquals(0, usedSize);
- // persist cache to file
- bucketCache.shutdown();
-
- // modified bucket cache file
- String file = testDir + "/bucket.cache";
- try (BufferedWriter out =
- new BufferedWriter(new OutputStreamWriter(new FileOutputStream(file,
false)))) {
- out.write("test bucket cache");
- }
- // can't restore cache from file
- bucketCache =
- new BucketCache("file:" + testDir + "/bucket.cache", capacitySize,
constructedBlockSize,
- constructedBlockSizes, writeThreads, writerQLen, testDir +
"/bucket.persistence");
- assertTrue(bucketCache.waitForCacheInitialization(10000));
- assertEquals(0, bucketCache.getAllocator().getUsedSize());
- assertEquals(0, bucketCache.backingMap.size());
-
TEST_UTIL.cleanupTestDir();
}
@@ -269,41 +298,50 @@ public class TestVerifyBucketCacheFile {
HBaseTestingUtil TEST_UTIL = new HBaseTestingUtil();
Path testDir = TEST_UTIL.getDataTestDir();
TEST_UTIL.getTestFileSystem().mkdirs(testDir);
-
- BucketCache bucketCache =
- new BucketCache("file:" + testDir + "/bucket.cache", capacitySize,
constructedBlockSize,
- constructedBlockSizes, writeThreads, writerQLen, testDir +
"/bucket.persistence");
- assertTrue(bucketCache.waitForCacheInitialization(10000));
- long usedSize = bucketCache.getAllocator().getUsedSize();
- assertEquals(0, usedSize);
-
- Pair<String, Long> myPair = new Pair<>();
-
- CacheTestUtils.HFileBlockPair[] blocks =
- CacheTestUtils.generateHFileBlocks(constructedBlockSize, 1);
- // Add blocks
- for (CacheTestUtils.HFileBlockPair block : blocks) {
- cacheAndWaitUntilFlushedToBucket(bucketCache, block.getBlockName(),
block.getBlock());
+ Configuration conf = HBaseConfiguration.create();
+ // Disables the persister thread by setting its interval to MAX_VALUE
+ conf.setLong(BUCKETCACHE_PERSIST_INTERVAL_KEY, Long.MAX_VALUE);
+ BucketCache bucketCache = null;
+ try {
+ bucketCache = new BucketCache("file:" + testDir + "/bucket.cache",
capacitySize,
+ constructedBlockSize, constructedBlockSizes, writeThreads, writerQLen,
+ testDir + "/bucket.persistence", DEFAULT_ERROR_TOLERATION_DURATION,
conf);
+ assertTrue(bucketCache.waitForCacheInitialization(10000));
+ long usedSize = bucketCache.getAllocator().getUsedSize();
+ assertEquals(0, usedSize);
+
+ Pair<String, Long> myPair = new Pair<>();
+
+ CacheTestUtils.HFileBlockPair[] blocks =
+ CacheTestUtils.generateHFileBlocks(constructedBlockSize, 1);
+ // Add blocks
+ for (CacheTestUtils.HFileBlockPair block : blocks) {
+ cacheAndWaitUntilFlushedToBucket(bucketCache, block.getBlockName(),
block.getBlock());
+ }
+ usedSize = bucketCache.getAllocator().getUsedSize();
+ assertNotEquals(0, usedSize);
+ long blockCount = bucketCache.backingMap.size();
+ assertNotEquals(0, blockCount);
+ // persist cache to file
+ bucketCache.shutdown();
+
+ // modified bucket cache file LastModifiedTime
+ final java.nio.file.Path file =
+ FileSystems.getDefault().getPath(testDir.toString(), "bucket.cache");
+ Files.setLastModifiedTime(file,
FileTime.from(Instant.now().plusMillis(1_000)));
+ // can't restore cache from file
+ bucketCache = new BucketCache("file:" + testDir + "/bucket.cache",
capacitySize,
+ constructedBlockSize, constructedBlockSizes, writeThreads, writerQLen,
+ testDir + "/bucket.persistence", DEFAULT_ERROR_TOLERATION_DURATION,
conf);
+ assertTrue(bucketCache.waitForCacheInitialization(10000));
+ waitPersistentCacheValidation(conf, bucketCache);
+ assertEquals(usedSize, bucketCache.getAllocator().getUsedSize());
+ assertEquals(blockCount, bucketCache.backingMap.size());
+ } finally {
+ if (bucketCache != null) {
+ bucketCache.shutdown();
+ }
}
- usedSize = bucketCache.getAllocator().getUsedSize();
- assertNotEquals(0, usedSize);
- long blockCount = bucketCache.backingMap.size();
- assertNotEquals(0, blockCount);
- // persist cache to file
- bucketCache.shutdown();
-
- // modified bucket cache file LastModifiedTime
- final java.nio.file.Path file =
- FileSystems.getDefault().getPath(testDir.toString(), "bucket.cache");
- Files.setLastModifiedTime(file,
FileTime.from(Instant.now().plusMillis(1_000)));
- // can't restore cache from file
- bucketCache =
- new BucketCache("file:" + testDir + "/bucket.cache", capacitySize,
constructedBlockSize,
- constructedBlockSizes, writeThreads, writerQLen, testDir +
"/bucket.persistence");
- assertTrue(bucketCache.waitForCacheInitialization(10000));
- assertEquals(usedSize, bucketCache.getAllocator().getUsedSize());
- assertEquals(blockCount, bucketCache.backingMap.size());
-
TEST_UTIL.cleanupTestDir();
}
@@ -324,43 +362,54 @@ public class TestVerifyBucketCacheFile {
// Disables the persister thread by setting its interval to MAX_VALUE
conf.setLong(BUCKETCACHE_PERSIST_INTERVAL_KEY, Long.MAX_VALUE);
String mapFileName = testDir + "/bucket.persistence" +
EnvironmentEdgeManager.currentTime();
- BucketCache bucketCache = new BucketCache("file:" + testDir +
"/bucket.cache", capacitySize,
- constructedBlockSize, constructedBlockSizes, writeThreads, writerQLen,
mapFileName,
- DEFAULT_ERROR_TOLERATION_DURATION, conf);
- assertTrue(bucketCache.waitForCacheInitialization(10000));
-
- CacheTestUtils.HFileBlockPair[] blocks =
- CacheTestUtils.generateHFileBlocks(constructedBlockSize, 4);
- // Add three blocks
- cacheAndWaitUntilFlushedToBucket(bucketCache, blocks[0].getBlockName(),
blocks[0].getBlock());
- cacheAndWaitUntilFlushedToBucket(bucketCache, blocks[1].getBlockName(),
blocks[1].getBlock());
- cacheAndWaitUntilFlushedToBucket(bucketCache, blocks[2].getBlockName(),
blocks[2].getBlock());
- // saves the current state
- bucketCache.persistToFile();
- // evicts first block
- bucketCache.evictBlock(blocks[0].getBlockName());
-
- // now adds a fourth block to bucket cache
- cacheAndWaitUntilFlushedToBucket(bucketCache, blocks[3].getBlockName(),
blocks[3].getBlock());
- // Creates new bucket cache instance without persisting to file after
evicting first block
- // and caching fourth block. So the bucket cache file has only the last
three blocks,
- // but backing map (containing cache keys) was persisted when first three
blocks
- // were in the cache. So the state on this recovery is:
- // - Backing map: [block0, block1, block2]
- // - Cache: [block1, block2, block3]
- // Therefore, this bucket cache would be able to recover only block1 and
block2.
- BucketCache newBucketCache = new BucketCache("file:" + testDir +
"/bucket.cache", capacitySize,
- constructedBlockSize, constructedBlockSizes, writeThreads, writerQLen,
mapFileName,
- DEFAULT_ERROR_TOLERATION_DURATION, conf);
- assertTrue(newBucketCache.waitForCacheInitialization(10000));
-
- assertNull(newBucketCache.getBlock(blocks[0].getBlockName(), false, false,
false));
- assertEquals(blocks[1].getBlock(),
- newBucketCache.getBlock(blocks[1].getBlockName(), false, false, false));
- assertEquals(blocks[2].getBlock(),
- newBucketCache.getBlock(blocks[2].getBlockName(), false, false, false));
- assertNull(newBucketCache.getBlock(blocks[3].getBlockName(), false, false,
false));
- assertEquals(2, newBucketCache.backingMap.size());
+ BucketCache bucketCache = null;
+ BucketCache newBucketCache = null;
+ try {
+ bucketCache = new BucketCache("file:" + testDir + "/bucket.cache",
capacitySize,
+ constructedBlockSize, constructedBlockSizes, writeThreads, writerQLen,
mapFileName,
+ DEFAULT_ERROR_TOLERATION_DURATION, conf);
+ assertTrue(bucketCache.waitForCacheInitialization(10000));
+
+ CacheTestUtils.HFileBlockPair[] blocks =
+ CacheTestUtils.generateHFileBlocks(constructedBlockSize, 4);
+ // Add three blocks
+ cacheAndWaitUntilFlushedToBucket(bucketCache, blocks[0].getBlockName(),
blocks[0].getBlock());
+ cacheAndWaitUntilFlushedToBucket(bucketCache, blocks[1].getBlockName(),
blocks[1].getBlock());
+ cacheAndWaitUntilFlushedToBucket(bucketCache, blocks[2].getBlockName(),
blocks[2].getBlock());
+ // saves the current state
+ bucketCache.persistToFile();
+ // evicts first block
+ bucketCache.evictBlock(blocks[0].getBlockName());
+
+ // now adds a fourth block to bucket cache
+ cacheAndWaitUntilFlushedToBucket(bucketCache, blocks[3].getBlockName(),
blocks[3].getBlock());
+ // Creates new bucket cache instance without persisting to file after
evicting first block
+ // and caching fourth block. So the bucket cache file has only the last
three blocks,
+ // but backing map (containing cache keys) was persisted when first
three blocks
+ // were in the cache. So the state on this recovery is:
+ // - Backing map: [block0, block1, block2]
+ // - Cache: [block1, block2, block3]
+ // Therefore, this bucket cache would be able to recover only block1 and
block2.
+ newBucketCache = new BucketCache("file:" + testDir + "/bucket.cache",
capacitySize,
+ constructedBlockSize, constructedBlockSizes, writeThreads, writerQLen,
mapFileName,
+ DEFAULT_ERROR_TOLERATION_DURATION, conf);
+ assertTrue(newBucketCache.waitForCacheInitialization(10000));
+ waitPersistentCacheValidation(conf, newBucketCache);
+ assertNull(newBucketCache.getBlock(blocks[0].getBlockName(), false,
false, false));
+ assertEquals(blocks[1].getBlock(),
+ newBucketCache.getBlock(blocks[1].getBlockName(), false, false,
false));
+ assertEquals(blocks[2].getBlock(),
+ newBucketCache.getBlock(blocks[2].getBlockName(), false, false,
false));
+ assertNull(newBucketCache.getBlock(blocks[3].getBlockName(), false,
false, false));
+ assertEquals(2, newBucketCache.backingMap.size());
+ } finally {
+ if (bucketCache != null) {
+ bucketCache.shutdown();
+ }
+ if (newBucketCache != null) {
+ newBucketCache.shutdown();
+ }
+ }
TEST_UTIL.cleanupTestDir();
}
@@ -389,31 +438,43 @@ public class TestVerifyBucketCacheFile {
conf.setLong(BACKING_MAP_PERSISTENCE_CHUNK_SIZE, chunkSize);
String mapFileName = testDir + "/bucket.persistence" +
EnvironmentEdgeManager.currentTime();
- BucketCache bucketCache = new BucketCache("file:" + testDir +
"/bucket.cache", capacitySize,
- constructedBlockSize, constructedBlockSizes, writeThreads, writerQLen,
mapFileName,
- DEFAULT_ERROR_TOLERATION_DURATION, conf);
- assertTrue(bucketCache.waitForCacheInitialization(10000));
-
- CacheTestUtils.HFileBlockPair[] blocks =
- CacheTestUtils.generateHFileBlocks(constructedBlockSize, numBlocks);
-
- for (int i = 0; i < numBlocks; i++) {
- cacheAndWaitUntilFlushedToBucket(bucketCache, blocks[i].getBlockName(),
blocks[i].getBlock());
- }
-
- // saves the current state
- bucketCache.persistToFile();
-
- // Create a new bucket which reads from persistence file.
- BucketCache newBucketCache = new BucketCache("file:" + testDir +
"/bucket.cache", capacitySize,
- constructedBlockSize, constructedBlockSizes, writeThreads, writerQLen,
mapFileName,
- DEFAULT_ERROR_TOLERATION_DURATION, conf);
- assertTrue(newBucketCache.waitForCacheInitialization(10000));
- assertEquals(numBlocks, newBucketCache.backingMap.size());
-
- for (int i = 0; i < numBlocks; i++) {
- assertEquals(blocks[i].getBlock(),
- newBucketCache.getBlock(blocks[i].getBlockName(), false, false,
false));
+ BucketCache bucketCache = null;
+ BucketCache newBucketCache = null;
+ try {
+ bucketCache = new BucketCache("file:" + testDir + "/bucket.cache",
capacitySize,
+ constructedBlockSize, constructedBlockSizes, writeThreads, writerQLen,
mapFileName,
+ DEFAULT_ERROR_TOLERATION_DURATION, conf);
+ assertTrue(bucketCache.waitForCacheInitialization(10000));
+
+ CacheTestUtils.HFileBlockPair[] blocks =
+ CacheTestUtils.generateHFileBlocks(constructedBlockSize, numBlocks);
+
+ for (int i = 0; i < numBlocks; i++) {
+ cacheAndWaitUntilFlushedToBucket(bucketCache, blocks[i].getBlockName(),
+ blocks[i].getBlock());
+ }
+
+ // saves the current state
+ bucketCache.persistToFile();
+
+ // Create a new bucket which reads from persistence file.
+ newBucketCache = new BucketCache("file:" + testDir + "/bucket.cache",
capacitySize,
+ constructedBlockSize, constructedBlockSizes, writeThreads, writerQLen,
mapFileName,
+ DEFAULT_ERROR_TOLERATION_DURATION, conf);
+ assertTrue(newBucketCache.waitForCacheInitialization(10000));
+ waitPersistentCacheValidation(conf, newBucketCache);
+ assertEquals(numBlocks, newBucketCache.backingMap.size());
+ for (int i = 0; i < numBlocks; i++) {
+ assertEquals(blocks[i].getBlock(),
+ newBucketCache.getBlock(blocks[i].getBlockName(), false, false,
false));
+ }
+ } finally {
+ if (bucketCache != null) {
+ bucketCache.shutdown();
+ }
+ if (newBucketCache != null) {
+ newBucketCache.shutdown();
+ }
}
TEST_UTIL.cleanupTestDir();
}
@@ -432,4 +493,8 @@ public class TestVerifyBucketCacheFile {
cache.cacheBlock(cacheKey, block);
waitUntilFlushedToBucket(cache, cacheKey);
}
+
+ private void waitPersistentCacheValidation(Configuration config, final
BucketCache bucketCache) {
+ Waiter.waitFor(config, 5000, () ->
bucketCache.getBackingMapValidated().get());
+ }
}