This is an automated email from the ASF dual-hosted git repository.

wchevreuil pushed a commit to branch branch-2.6
in repository https://gitbox.apache.org/repos/asf/hbase.git


The following commit(s) were added to refs/heads/branch-2.6 by this push:
     new 9053aa1f482 HBASE-29183 Fix flakeyness on TestVerifyBucketCacheFile 
(#6787)
9053aa1f482 is described below

commit 9053aa1f48201fd354e1410f56fa9a3c4d6676fc
Author: Wellington Ramos Chevreuil <[email protected]>
AuthorDate: Sun Mar 16 13:25:12 2025 +0000

    HBASE-29183 Fix flakeyness on TestVerifyBucketCacheFile (#6787)
    
    Signed-off-by: Duo Zhang <[email protected]>
    Change-Id: I077ecb4cf14a8c812e47794075857bd9d50458f0
---
 .../io/hfile/bucket/TestVerifyBucketCacheFile.java | 466 ++++++++++++---------
 1 file changed, 264 insertions(+), 202 deletions(-)

diff --git 
a/hbase-server/src/test/java/org/apache/hadoop/hbase/io/hfile/bucket/TestVerifyBucketCacheFile.java
 
b/hbase-server/src/test/java/org/apache/hadoop/hbase/io/hfile/bucket/TestVerifyBucketCacheFile.java
index b677468c042..98131cd2b7a 100644
--- 
a/hbase-server/src/test/java/org/apache/hadoop/hbase/io/hfile/bucket/TestVerifyBucketCacheFile.java
+++ 
b/hbase-server/src/test/java/org/apache/hadoop/hbase/io/hfile/bucket/TestVerifyBucketCacheFile.java
@@ -98,63 +98,77 @@ public class TestVerifyBucketCacheFile {
     Path testDir = TEST_UTIL.getDataTestDir();
     TEST_UTIL.getTestFileSystem().mkdirs(testDir);
 
-    BucketCache bucketCache =
-      new BucketCache("file:" + testDir + "/bucket.cache", capacitySize, 
constructedBlockSize,
-        constructedBlockSizes, writeThreads, writerQLen, testDir + 
"/bucket.persistence");
-    long usedSize = bucketCache.getAllocator().getUsedSize();
-    assertEquals(0, usedSize);
-    CacheTestUtils.HFileBlockPair[] blocks =
-      CacheTestUtils.generateHFileBlocks(constructedBlockSize, 1);
-    // Add blocks
-    for (CacheTestUtils.HFileBlockPair block : blocks) {
-      cacheAndWaitUntilFlushedToBucket(bucketCache, block.getBlockName(), 
block.getBlock());
-    }
-    usedSize = bucketCache.getAllocator().getUsedSize();
-    assertNotEquals(0, usedSize);
-    // 1.persist cache to file
-    bucketCache.shutdown();
-    // restore cache from file
-    bucketCache =
-      new BucketCache("file:" + testDir + "/bucket.cache", capacitySize, 
constructedBlockSize,
-        constructedBlockSizes, writeThreads, writerQLen, testDir + 
"/bucket.persistence");
-    assertEquals(usedSize, bucketCache.getAllocator().getUsedSize());
-    // persist cache to file
-    bucketCache.shutdown();
-
-    // 2.delete bucket cache file
-    final java.nio.file.Path cacheFile =
-      FileSystems.getDefault().getPath(testDir.toString(), "bucket.cache");
-    assertTrue(Files.deleteIfExists(cacheFile));
-    // can't restore cache from file
-    final BucketCache recoveredBucketCache =
-      new BucketCache("file:" + testDir + "/bucket.cache", capacitySize, 
constructedBlockSize,
-        constructedBlockSizes, writeThreads, writerQLen, testDir + 
"/bucket.persistence");
-    Waiter.waitFor(HBaseConfiguration.create(), 1000,
-      () -> recoveredBucketCache.getBackingMapValidated().get());
-    assertEquals(0, recoveredBucketCache.getAllocator().getUsedSize());
-    assertEquals(0, recoveredBucketCache.backingMap.size());
-    // Add blocks
-    for (CacheTestUtils.HFileBlockPair block : blocks) {
-      cacheAndWaitUntilFlushedToBucket(recoveredBucketCache, 
block.getBlockName(),
-        block.getBlock());
-    }
-    usedSize = recoveredBucketCache.getAllocator().getUsedSize();
-    assertNotEquals(0, usedSize);
-    // persist cache to file
-    recoveredBucketCache.shutdown();
-
-    // 3.delete backingMap persistence file
-    final java.nio.file.Path mapFile =
-      FileSystems.getDefault().getPath(testDir.toString(), 
"bucket.persistence");
-    assertTrue(Files.deleteIfExists(mapFile));
-    // can't restore cache from file
-    bucketCache =
-      new BucketCache("file:" + testDir + "/bucket.cache", capacitySize, 
constructedBlockSize,
-        constructedBlockSizes, writeThreads, writerQLen, testDir + 
"/bucket.persistence");
-    Thread.sleep(100);
-    assertEquals(0, bucketCache.getAllocator().getUsedSize());
-    assertEquals(0, bucketCache.backingMap.size());
+    Configuration conf = HBaseConfiguration.create();
+    // Disables the persister thread by setting its interval to MAX_VALUE
+    conf.setLong(BUCKETCACHE_PERSIST_INTERVAL_KEY, Long.MAX_VALUE);
 
+    BucketCache bucketCache = null;
+    BucketCache recoveredBucketCache = null;
+    try {
+      bucketCache = new BucketCache("file:" + testDir + "/bucket.cache", 
capacitySize,
+        constructedBlockSize, constructedBlockSizes, writeThreads, writerQLen,
+        testDir + "/bucket.persistence", DEFAULT_ERROR_TOLERATION_DURATION, 
conf);
+      long usedSize = bucketCache.getAllocator().getUsedSize();
+      assertEquals(0, usedSize);
+      CacheTestUtils.HFileBlockPair[] blocks =
+        CacheTestUtils.generateHFileBlocks(constructedBlockSize, 1);
+      // Add blocks
+      for (CacheTestUtils.HFileBlockPair block : blocks) {
+        cacheAndWaitUntilFlushedToBucket(bucketCache, block.getBlockName(), 
block.getBlock());
+      }
+      usedSize = bucketCache.getAllocator().getUsedSize();
+      assertNotEquals(0, usedSize);
+      // 1.persist cache to file
+      bucketCache.shutdown();
+      // restore cache from file
+      bucketCache = new BucketCache("file:" + testDir + "/bucket.cache", 
capacitySize,
+        constructedBlockSize, constructedBlockSizes, writeThreads, writerQLen,
+        testDir + "/bucket.persistence", DEFAULT_ERROR_TOLERATION_DURATION, 
conf);
+      waitPersistentCacheValidation(conf, bucketCache);
+      assertEquals(usedSize, bucketCache.getAllocator().getUsedSize());
+      // persist cache to file
+      bucketCache.shutdown();
+
+      // 2.delete bucket cache file
+      final java.nio.file.Path cacheFile =
+        FileSystems.getDefault().getPath(testDir.toString(), "bucket.cache");
+      assertTrue(Files.deleteIfExists(cacheFile));
+      // can't restore cache from file
+      recoveredBucketCache = new BucketCache("file:" + testDir + 
"/bucket.cache", capacitySize,
+        constructedBlockSize, constructedBlockSizes, writeThreads, writerQLen,
+        testDir + "/bucket.persistence", DEFAULT_ERROR_TOLERATION_DURATION, 
conf);
+      waitPersistentCacheValidation(conf, recoveredBucketCache);
+      assertEquals(0, recoveredBucketCache.getAllocator().getUsedSize());
+      assertEquals(0, recoveredBucketCache.backingMap.size());
+      // Add blocks
+      for (CacheTestUtils.HFileBlockPair block : blocks) {
+        cacheAndWaitUntilFlushedToBucket(recoveredBucketCache, 
block.getBlockName(),
+          block.getBlock());
+      }
+      usedSize = recoveredBucketCache.getAllocator().getUsedSize();
+      assertNotEquals(0, usedSize);
+      // persist cache to file
+      recoveredBucketCache.shutdown();
+
+      // 3.delete backingMap persistence file
+      final java.nio.file.Path mapFile =
+        FileSystems.getDefault().getPath(testDir.toString(), 
"bucket.persistence");
+      assertTrue(Files.deleteIfExists(mapFile));
+      // can't restore cache from file
+      bucketCache = new BucketCache("file:" + testDir + "/bucket.cache", 
capacitySize,
+        constructedBlockSize, constructedBlockSizes, writeThreads, writerQLen,
+        testDir + "/bucket.persistence", DEFAULT_ERROR_TOLERATION_DURATION, 
conf);
+      waitPersistentCacheValidation(conf, bucketCache);
+      assertEquals(0, bucketCache.getAllocator().getUsedSize());
+      assertEquals(0, bucketCache.backingMap.size());
+    } finally {
+      if (bucketCache != null) {
+        bucketCache.shutdown();
+      }
+      if (recoveredBucketCache != null) {
+        recoveredBucketCache.shutdown();
+      }
+    }
     TEST_UTIL.cleanupTestDir();
   }
 
@@ -167,32 +181,39 @@ public class TestVerifyBucketCacheFile {
     Configuration conf = TEST_UTIL.getConfiguration();
     conf.setLong(CacheConfig.BUCKETCACHE_PERSIST_INTERVAL_KEY, 300);
     String mapFileName = testDir + "/bucket.persistence" + 
EnvironmentEdgeManager.currentTime();
-    BucketCache bucketCache =
-      new BucketCache("file:" + testDir + "/bucket.cache", capacitySize, 
constructedBlockSize,
-        constructedBlockSizes, writeThreads, writerQLen, mapFileName, 60 * 
1000, conf);
-
-    long usedSize = bucketCache.getAllocator().getUsedSize();
-    assertEquals(0, usedSize);
-    CacheTestUtils.HFileBlockPair[] blocks =
-      CacheTestUtils.generateHFileBlocks(constructedBlockSize, 1);
-    // Add blocks
-    for (CacheTestUtils.HFileBlockPair block : blocks) {
-      cacheAndWaitUntilFlushedToBucket(bucketCache, block.getBlockName(), 
block.getBlock());
+    BucketCache bucketCache = null;
+    try {
+      bucketCache = new BucketCache("file:" + testDir + "/bucket.cache", 
capacitySize,
+        constructedBlockSize, constructedBlockSizes, writeThreads, writerQLen, 
mapFileName,
+        DEFAULT_ERROR_TOLERATION_DURATION, conf);
+      long usedSize = bucketCache.getAllocator().getUsedSize();
+      assertEquals(0, usedSize);
+      CacheTestUtils.HFileBlockPair[] blocks =
+        CacheTestUtils.generateHFileBlocks(constructedBlockSize, 1);
+      // Add blocks
+      for (CacheTestUtils.HFileBlockPair block : blocks) {
+        cacheAndWaitUntilFlushedToBucket(bucketCache, block.getBlockName(), 
block.getBlock());
+      }
+      usedSize = bucketCache.getAllocator().getUsedSize();
+      assertNotEquals(0, usedSize);
+      // Shutdown BucketCache
+      bucketCache.shutdown();
+      // Delete the persistence file
+      File mapFile = new File(mapFileName);
+      assertTrue(mapFile.delete());
+      Thread.sleep(350);
+      // Create BucketCache
+      bucketCache = new BucketCache("file:" + testDir + "/bucket.cache", 
capacitySize,
+        constructedBlockSize, constructedBlockSizes, writeThreads, writerQLen, 
mapFileName,
+        DEFAULT_ERROR_TOLERATION_DURATION, conf);
+      waitPersistentCacheValidation(conf, bucketCache);
+      assertEquals(0, bucketCache.getAllocator().getUsedSize());
+      assertEquals(0, bucketCache.backingMap.size());
+    } finally {
+      if (bucketCache != null) {
+        bucketCache.shutdown();
+      }
     }
-    usedSize = bucketCache.getAllocator().getUsedSize();
-    assertNotEquals(0, usedSize);
-    // Shutdown BucketCache
-    bucketCache.shutdown();
-    // Delete the persistence file
-    File mapFile = new File(mapFileName);
-    assertTrue(mapFile.delete());
-    Thread.sleep(350);
-    // Create BucketCache
-    bucketCache =
-      new BucketCache("file:" + testDir + "/bucket.cache", capacitySize, 
constructedBlockSize,
-        constructedBlockSizes, writeThreads, writerQLen, mapFileName, 60 * 
1000, conf);
-    assertEquals(0, bucketCache.getAllocator().getUsedSize());
-    assertEquals(0, bucketCache.backingMap.size());
   }
 
   /**
@@ -211,37 +232,43 @@ public class TestVerifyBucketCacheFile {
     Configuration conf = HBaseConfiguration.create();
     // Disables the persister thread by setting its interval to MAX_VALUE
     conf.setLong(BUCKETCACHE_PERSIST_INTERVAL_KEY, Long.MAX_VALUE);
-    BucketCache bucketCache = new BucketCache("file:" + testDir + 
"/bucket.cache", capacitySize,
-      constructedBlockSize, constructedBlockSizes, writeThreads, writerQLen,
-      testDir + "/bucket.persistence", DEFAULT_ERROR_TOLERATION_DURATION, 
conf);
-    long usedSize = bucketCache.getAllocator().getUsedSize();
-    assertEquals(0, usedSize);
-
-    CacheTestUtils.HFileBlockPair[] blocks =
-      CacheTestUtils.generateHFileBlocks(constructedBlockSize, 1);
-    // Add blocks
-    for (CacheTestUtils.HFileBlockPair block : blocks) {
-      cacheAndWaitUntilFlushedToBucket(bucketCache, block.getBlockName(), 
block.getBlock());
-    }
-    usedSize = bucketCache.getAllocator().getUsedSize();
-    assertNotEquals(0, usedSize);
-    // persist cache to file
-    bucketCache.shutdown();
-
-    // modified bucket cache file
-    String file = testDir + "/bucket.cache";
-    try (BufferedWriter out =
-      new BufferedWriter(new OutputStreamWriter(new FileOutputStream(file, 
false)))) {
-      out.write("test bucket cache");
+    BucketCache bucketCache = null;
+    try {
+      bucketCache = new BucketCache("file:" + testDir + "/bucket.cache", 
capacitySize,
+        constructedBlockSize, constructedBlockSizes, writeThreads, writerQLen,
+        testDir + "/bucket.persistence", DEFAULT_ERROR_TOLERATION_DURATION, 
conf);
+      long usedSize = bucketCache.getAllocator().getUsedSize();
+      assertEquals(0, usedSize);
+
+      CacheTestUtils.HFileBlockPair[] blocks =
+        CacheTestUtils.generateHFileBlocks(constructedBlockSize, 1);
+      // Add blocks
+      for (CacheTestUtils.HFileBlockPair block : blocks) {
+        cacheAndWaitUntilFlushedToBucket(bucketCache, block.getBlockName(), 
block.getBlock());
+      }
+      usedSize = bucketCache.getAllocator().getUsedSize();
+      assertNotEquals(0, usedSize);
+      // persist cache to file
+      bucketCache.shutdown();
+
+      // modified bucket cache file
+      String file = testDir + "/bucket.cache";
+      try (BufferedWriter out =
+        new BufferedWriter(new OutputStreamWriter(new FileOutputStream(file, 
false)))) {
+        out.write("test bucket cache");
+      }
+      // can't restore cache from file
+      bucketCache = new BucketCache("file:" + testDir + "/bucket.cache", 
capacitySize,
+        constructedBlockSize, constructedBlockSizes, writeThreads, writerQLen,
+        testDir + "/bucket.persistence", DEFAULT_ERROR_TOLERATION_DURATION, 
conf);
+      waitPersistentCacheValidation(conf, bucketCache);
+      assertEquals(0, bucketCache.getAllocator().getUsedSize());
+      assertEquals(0, bucketCache.backingMap.size());
+    } finally {
+      if (bucketCache != null) {
+        bucketCache.shutdown();
+      }
     }
-    // can't restore cache from file
-    bucketCache =
-      new BucketCache("file:" + testDir + "/bucket.cache", capacitySize, 
constructedBlockSize,
-        constructedBlockSizes, writeThreads, writerQLen, testDir + 
"/bucket.persistence");
-    Thread.sleep(100);
-    assertEquals(0, bucketCache.getAllocator().getUsedSize());
-    assertEquals(0, bucketCache.backingMap.size());
-
     TEST_UTIL.cleanupTestDir();
   }
 
@@ -264,39 +291,48 @@ public class TestVerifyBucketCacheFile {
     HBaseTestingUtility TEST_UTIL = new HBaseTestingUtility();
     Path testDir = TEST_UTIL.getDataTestDir();
     TEST_UTIL.getTestFileSystem().mkdirs(testDir);
-
-    BucketCache bucketCache =
-      new BucketCache("file:" + testDir + "/bucket.cache", capacitySize, 
constructedBlockSize,
-        constructedBlockSizes, writeThreads, writerQLen, testDir + 
"/bucket.persistence");
-    long usedSize = bucketCache.getAllocator().getUsedSize();
-    assertEquals(0, usedSize);
-
-    Pair<String, Long> myPair = new Pair<>();
-
-    CacheTestUtils.HFileBlockPair[] blocks =
-      CacheTestUtils.generateHFileBlocks(constructedBlockSize, 1);
-    // Add blocks
-    for (CacheTestUtils.HFileBlockPair block : blocks) {
-      cacheAndWaitUntilFlushedToBucket(bucketCache, block.getBlockName(), 
block.getBlock());
+    Configuration conf = HBaseConfiguration.create();
+    // Disables the persister thread by setting its interval to MAX_VALUE
+    conf.setLong(BUCKETCACHE_PERSIST_INTERVAL_KEY, Long.MAX_VALUE);
+    BucketCache bucketCache = null;
+    try {
+      bucketCache = new BucketCache("file:" + testDir + "/bucket.cache", 
capacitySize,
+        constructedBlockSize, constructedBlockSizes, writeThreads, writerQLen,
+        testDir + "/bucket.persistence", DEFAULT_ERROR_TOLERATION_DURATION, 
conf);
+      long usedSize = bucketCache.getAllocator().getUsedSize();
+      assertEquals(0, usedSize);
+
+      Pair<String, Long> myPair = new Pair<>();
+
+      CacheTestUtils.HFileBlockPair[] blocks =
+        CacheTestUtils.generateHFileBlocks(constructedBlockSize, 1);
+      // Add blocks
+      for (CacheTestUtils.HFileBlockPair block : blocks) {
+        cacheAndWaitUntilFlushedToBucket(bucketCache, block.getBlockName(), 
block.getBlock());
+      }
+      usedSize = bucketCache.getAllocator().getUsedSize();
+      assertNotEquals(0, usedSize);
+      long blockCount = bucketCache.backingMap.size();
+      assertNotEquals(0, blockCount);
+      // persist cache to file
+      bucketCache.shutdown();
+
+      // modified bucket cache file LastModifiedTime
+      final java.nio.file.Path file =
+        FileSystems.getDefault().getPath(testDir.toString(), "bucket.cache");
+      Files.setLastModifiedTime(file, 
FileTime.from(Instant.now().plusMillis(1_000)));
+      // can't restore cache from file
+      bucketCache = new BucketCache("file:" + testDir + "/bucket.cache", 
capacitySize,
+        constructedBlockSize, constructedBlockSizes, writeThreads, writerQLen,
+        testDir + "/bucket.persistence", DEFAULT_ERROR_TOLERATION_DURATION, 
conf);
+      waitPersistentCacheValidation(conf, bucketCache);
+      assertEquals(usedSize, bucketCache.getAllocator().getUsedSize());
+      assertEquals(blockCount, bucketCache.backingMap.size());
+    } finally {
+      if (bucketCache != null) {
+        bucketCache.shutdown();
+      }
     }
-    usedSize = bucketCache.getAllocator().getUsedSize();
-    assertNotEquals(0, usedSize);
-    long blockCount = bucketCache.backingMap.size();
-    assertNotEquals(0, blockCount);
-    // persist cache to file
-    bucketCache.shutdown();
-
-    // modified bucket cache file LastModifiedTime
-    final java.nio.file.Path file =
-      FileSystems.getDefault().getPath(testDir.toString(), "bucket.cache");
-    Files.setLastModifiedTime(file, 
FileTime.from(Instant.now().plusMillis(1_000)));
-    // can't restore cache from file
-    bucketCache =
-      new BucketCache("file:" + testDir + "/bucket.cache", capacitySize, 
constructedBlockSize,
-        constructedBlockSizes, writeThreads, writerQLen, testDir + 
"/bucket.persistence");
-    assertEquals(usedSize, bucketCache.getAllocator().getUsedSize());
-    assertEquals(blockCount, bucketCache.backingMap.size());
-
     TEST_UTIL.cleanupTestDir();
   }
 
@@ -317,41 +353,52 @@ public class TestVerifyBucketCacheFile {
     // Disables the persister thread by setting its interval to MAX_VALUE
     conf.setLong(BUCKETCACHE_PERSIST_INTERVAL_KEY, Long.MAX_VALUE);
     String mapFileName = testDir + "/bucket.persistence" + 
EnvironmentEdgeManager.currentTime();
-    BucketCache bucketCache = new BucketCache("file:" + testDir + 
"/bucket.cache", capacitySize,
-      constructedBlockSize, constructedBlockSizes, writeThreads, writerQLen, 
mapFileName,
-      DEFAULT_ERROR_TOLERATION_DURATION, conf);
-
-    CacheTestUtils.HFileBlockPair[] blocks =
-      CacheTestUtils.generateHFileBlocks(constructedBlockSize, 4);
-    // Add three blocks
-    cacheAndWaitUntilFlushedToBucket(bucketCache, blocks[0].getBlockName(), 
blocks[0].getBlock());
-    cacheAndWaitUntilFlushedToBucket(bucketCache, blocks[1].getBlockName(), 
blocks[1].getBlock());
-    cacheAndWaitUntilFlushedToBucket(bucketCache, blocks[2].getBlockName(), 
blocks[2].getBlock());
-    // saves the current state
-    bucketCache.persistToFile();
-    // evicts first block
-    bucketCache.evictBlock(blocks[0].getBlockName());
-
-    // now adds a fourth block to bucket cache
-    cacheAndWaitUntilFlushedToBucket(bucketCache, blocks[3].getBlockName(), 
blocks[3].getBlock());
-    // Creates new bucket cache instance without persisting to file after 
evicting first block
-    // and caching fourth block. So the bucket cache file has only the last 
three blocks,
-    // but backing map (containing cache keys) was persisted when first three 
blocks
-    // were in the cache. So the state on this recovery is:
-    // - Backing map: [block0, block1, block2]
-    // - Cache: [block1, block2, block3]
-    // Therefore, this bucket cache would be able to recover only block1 and 
block2.
-    BucketCache newBucketCache = new BucketCache("file:" + testDir + 
"/bucket.cache", capacitySize,
-      constructedBlockSize, constructedBlockSizes, writeThreads, writerQLen, 
mapFileName,
-      DEFAULT_ERROR_TOLERATION_DURATION, conf);
-
-    assertNull(newBucketCache.getBlock(blocks[0].getBlockName(), false, false, 
false));
-    assertEquals(blocks[1].getBlock(),
-      newBucketCache.getBlock(blocks[1].getBlockName(), false, false, false));
-    assertEquals(blocks[2].getBlock(),
-      newBucketCache.getBlock(blocks[2].getBlockName(), false, false, false));
-    assertNull(newBucketCache.getBlock(blocks[3].getBlockName(), false, false, 
false));
-    assertEquals(2, newBucketCache.backingMap.size());
+    BucketCache bucketCache = null;
+    BucketCache newBucketCache = null;
+    try {
+      bucketCache = new BucketCache("file:" + testDir + "/bucket.cache", 
capacitySize,
+        constructedBlockSize, constructedBlockSizes, writeThreads, writerQLen, 
mapFileName,
+        DEFAULT_ERROR_TOLERATION_DURATION, conf);
+
+      CacheTestUtils.HFileBlockPair[] blocks =
+        CacheTestUtils.generateHFileBlocks(constructedBlockSize, 4);
+      // Add three blocks
+      cacheAndWaitUntilFlushedToBucket(bucketCache, blocks[0].getBlockName(), 
blocks[0].getBlock());
+      cacheAndWaitUntilFlushedToBucket(bucketCache, blocks[1].getBlockName(), 
blocks[1].getBlock());
+      cacheAndWaitUntilFlushedToBucket(bucketCache, blocks[2].getBlockName(), 
blocks[2].getBlock());
+      // saves the current state
+      bucketCache.persistToFile();
+      // evicts first block
+      bucketCache.evictBlock(blocks[0].getBlockName());
+
+      // now adds a fourth block to bucket cache
+      cacheAndWaitUntilFlushedToBucket(bucketCache, blocks[3].getBlockName(), 
blocks[3].getBlock());
+      // Creates new bucket cache instance without persisting to file after 
evicting first block
+      // and caching fourth block. So the bucket cache file has only the last 
three blocks,
+      // but backing map (containing cache keys) was persisted when first 
three blocks
+      // were in the cache. So the state on this recovery is:
+      // - Backing map: [block0, block1, block2]
+      // - Cache: [block1, block2, block3]
+      // Therefore, this bucket cache would be able to recover only block1 and 
block2.
+      newBucketCache = new BucketCache("file:" + testDir + "/bucket.cache", 
capacitySize,
+        constructedBlockSize, constructedBlockSizes, writeThreads, writerQLen, 
mapFileName,
+        DEFAULT_ERROR_TOLERATION_DURATION, conf);
+      waitPersistentCacheValidation(conf, newBucketCache);
+      assertNull(newBucketCache.getBlock(blocks[0].getBlockName(), false, 
false, false));
+      assertEquals(blocks[1].getBlock(),
+        newBucketCache.getBlock(blocks[1].getBlockName(), false, false, 
false));
+      assertEquals(blocks[2].getBlock(),
+        newBucketCache.getBlock(blocks[2].getBlockName(), false, false, 
false));
+      assertNull(newBucketCache.getBlock(blocks[3].getBlockName(), false, 
false, false));
+      assertEquals(2, newBucketCache.backingMap.size());
+    } finally {
+      if (bucketCache != null) {
+        bucketCache.shutdown();
+      }
+      if (newBucketCache != null) {
+        newBucketCache.shutdown();
+      }
+    }
     TEST_UTIL.cleanupTestDir();
   }
 
@@ -373,30 +420,41 @@ public class TestVerifyBucketCacheFile {
     conf.setLong(BACKING_MAP_PERSISTENCE_CHUNK_SIZE, chunkSize);
 
     String mapFileName = testDir + "/bucket.persistence" + 
EnvironmentEdgeManager.currentTime();
-    BucketCache bucketCache = new BucketCache("file:" + testDir + 
"/bucket.cache", capacitySize,
-      constructedBlockSize, constructedBlockSizes, writeThreads, writerQLen, 
mapFileName,
-      DEFAULT_ERROR_TOLERATION_DURATION, conf);
-
-    CacheTestUtils.HFileBlockPair[] blocks =
-      CacheTestUtils.generateHFileBlocks(constructedBlockSize, numBlocks);
-
-    for (int i = 0; i < numBlocks; i++) {
-      cacheAndWaitUntilFlushedToBucket(bucketCache, blocks[i].getBlockName(), 
blocks[i].getBlock());
-    }
-
-    // saves the current state
-    bucketCache.persistToFile();
-
-    // Create a new bucket which reads from persistence file.
-    BucketCache newBucketCache = new BucketCache("file:" + testDir + 
"/bucket.cache", capacitySize,
-      constructedBlockSize, constructedBlockSizes, writeThreads, writerQLen, 
mapFileName,
-      DEFAULT_ERROR_TOLERATION_DURATION, conf);
-
-    assertEquals(numBlocks, newBucketCache.backingMap.size());
-
-    for (int i = 0; i < numBlocks; i++) {
-      assertEquals(blocks[i].getBlock(),
-        newBucketCache.getBlock(blocks[i].getBlockName(), false, false, 
false));
+    BucketCache bucketCache = null;
+    BucketCache newBucketCache = null;
+    try {
+      bucketCache = new BucketCache("file:" + testDir + "/bucket.cache", 
capacitySize,
+        constructedBlockSize, constructedBlockSizes, writeThreads, writerQLen, 
mapFileName,
+        DEFAULT_ERROR_TOLERATION_DURATION, conf);
+
+      CacheTestUtils.HFileBlockPair[] blocks =
+        CacheTestUtils.generateHFileBlocks(constructedBlockSize, numBlocks);
+
+      for (int i = 0; i < numBlocks; i++) {
+        cacheAndWaitUntilFlushedToBucket(bucketCache, blocks[i].getBlockName(),
+          blocks[i].getBlock());
+      }
+
+      // saves the current state
+      bucketCache.persistToFile();
+
+      // Create a new bucket which reads from persistence file.
+      newBucketCache = new BucketCache("file:" + testDir + "/bucket.cache", 
capacitySize,
+        constructedBlockSize, constructedBlockSizes, writeThreads, writerQLen, 
mapFileName,
+        DEFAULT_ERROR_TOLERATION_DURATION, conf);
+      waitPersistentCacheValidation(conf, newBucketCache);
+      assertEquals(numBlocks, newBucketCache.backingMap.size());
+      for (int i = 0; i < numBlocks; i++) {
+        assertEquals(blocks[i].getBlock(),
+          newBucketCache.getBlock(blocks[i].getBlockName(), false, false, 
false));
+      }
+    } finally {
+      if (bucketCache != null) {
+        bucketCache.shutdown();
+      }
+      if (newBucketCache != null) {
+        newBucketCache.shutdown();
+      }
     }
     TEST_UTIL.cleanupTestDir();
   }
@@ -415,4 +473,8 @@ public class TestVerifyBucketCacheFile {
     cache.cacheBlock(cacheKey, block);
     waitUntilFlushedToBucket(cache, cacheKey);
   }
+
+  private void waitPersistentCacheValidation(Configuration config, final 
BucketCache bucketCache) {
+    Waiter.waitFor(config, 5000, () -> 
bucketCache.getBackingMapValidated().get());
+  }
 }

Reply via email to