This is an automated email from the ASF dual-hosted git repository.

wchevreuil pushed a commit to branch branch-3
in repository https://gitbox.apache.org/repos/asf/hbase.git


The following commit(s) were added to refs/heads/branch-3 by this push:
     new 30f596666c7 HBASE-28004 Persistent cache map can get corrupt if crash 
happens midway through the write (#5341)
30f596666c7 is described below

commit 30f596666c7aa5b8b8fa7e55bee9448456c3eeb9
Author: Wellington Ramos Chevreuil <wchevre...@apache.org>
AuthorDate: Wed Aug 23 10:17:21 2023 +0100

    HBASE-28004 Persistent cache map can get corrupt if crash happens midway 
through the write (#5341)
    
    Signed-off-by: Ankit Singhal <an...@apache.org>
    Reviewed-by: Rahul Agarkar <rahul.agar...@gmail.com>
---
 .../main/protobuf/server/io/BucketCacheEntry.proto |   3 +
 .../apache/hadoop/hbase/io/hfile/CacheConfig.java  |   2 -
 .../hadoop/hbase/io/hfile/HFilePreadReader.java    |  41 +++-
 .../hadoop/hbase/io/hfile/PrefetchExecutor.java    |  89 +--------
 .../hadoop/hbase/io/hfile/bucket/BucketCache.java  | 167 ++++++++++++----
 .../hadoop/hbase/io/hfile/bucket/BucketEntry.java  |  28 ++-
 .../hbase/io/hfile/bucket/BucketProtoUtils.java    |  10 +-
 .../hadoop/hbase/io/hfile/bucket/FileIOEngine.java |  37 +++-
 .../hfile/TestBlockEvictionOnRegionMovement.java   |   1 -
 .../hadoop/hbase/io/hfile/TestPrefetchRSClose.java |   4 -
 .../io/hfile/TestPrefetchWithBucketCache.java      | 211 +++++++++++++++++++++
 .../hbase/io/hfile/bucket/TestBucketCache.java     |  84 +++++---
 .../io/hfile/bucket/TestBucketCachePersister.java  |   9 +-
 .../io/hfile/bucket/TestByteBufferIOEngine.java    |   2 +-
 .../io/hfile/bucket/TestPrefetchPersistence.java   |   9 +-
 .../hadoop/hbase/io/hfile/bucket/TestRAMCache.java |   2 +-
 .../io/hfile/bucket/TestVerifyBucketCacheFile.java | 102 ++++++++--
 17 files changed, 601 insertions(+), 200 deletions(-)

diff --git 
a/hbase-protocol-shaded/src/main/protobuf/server/io/BucketCacheEntry.proto 
b/hbase-protocol-shaded/src/main/protobuf/server/io/BucketCacheEntry.proto
index 038c6ca3f04..ae1980fe51e 100644
--- a/hbase-protocol-shaded/src/main/protobuf/server/io/BucketCacheEntry.proto
+++ b/hbase-protocol-shaded/src/main/protobuf/server/io/BucketCacheEntry.proto
@@ -32,6 +32,7 @@ message BucketCacheEntry {
   map<int32, string> deserializers = 4;
   required BackingMap backing_map = 5;
   optional bytes checksum = 6;
+  map<string, bool> prefetched_files = 7;
 }
 
 message BackingMap {
@@ -71,6 +72,8 @@ message BucketEntry {
   required int64 access_counter = 3;
   required int32 deserialiser_index = 4;
   required BlockPriority priority = 5;
+  required int64 cachedTime = 6;
+  optional int32 disk_size_with_header = 7;
 }
 
 enum BlockPriority {
diff --git 
a/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/CacheConfig.java 
b/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/CacheConfig.java
index 15c64c03d5e..57f91fa19f4 100644
--- 
a/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/CacheConfig.java
+++ 
b/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/CacheConfig.java
@@ -93,8 +93,6 @@ public class CacheConfig {
   public static final String DROP_BEHIND_CACHE_COMPACTION_KEY =
     "hbase.hfile.drop.behind.compaction";
 
-  public static final String PREFETCH_PERSISTENCE_PATH_KEY = 
"hbase.prefetch.file.list.path";
-
   /**
    * Configuration key to set interval for persisting bucket cache to disk.
    */
diff --git 
a/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/HFilePreadReader.java
 
b/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/HFilePreadReader.java
index 2c71ce9f484..f9c0ae59242 100644
--- 
a/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/HFilePreadReader.java
+++ 
b/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/HFilePreadReader.java
@@ -18,9 +18,13 @@
 package org.apache.hadoop.hbase.io.hfile;
 
 import java.io.IOException;
+import java.util.Optional;
+import org.apache.commons.lang3.mutable.MutableBoolean;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.hbase.io.FSDataInputStreamWrapper;
+import org.apache.hadoop.hbase.io.hfile.bucket.BucketCache;
+import org.apache.hadoop.hbase.io.hfile.bucket.BucketEntry;
 import org.apache.yetus.audience.InterfaceAudience;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -35,8 +39,14 @@ public class HFilePreadReader extends HFileReaderImpl {
   public HFilePreadReader(ReaderContext context, HFileInfo fileInfo, 
CacheConfig cacheConf,
     Configuration conf) throws IOException {
     super(context, fileInfo, cacheConf, conf);
+    final MutableBoolean fileAlreadyCached = new MutableBoolean(false);
+    BucketCache.getBuckedCacheFromCacheConfig(cacheConf).ifPresent(bc -> 
fileAlreadyCached
+      .setValue(bc.getFullyCachedFiles().get(path.getName()) == null ? false : 
true));
     // Prefetch file blocks upon open if requested
-    if (cacheConf.shouldPrefetchOnOpen() && cacheIfCompactionsOff()) {
+    if (
+      cacheConf.shouldPrefetchOnOpen() && cacheIfCompactionsOff()
+        && !fileAlreadyCached.booleanValue()
+    ) {
       PrefetchExecutor.request(path, new Runnable() {
         @Override
         public void run() {
@@ -55,12 +65,36 @@ public class HFilePreadReader extends HFileReaderImpl {
             if (LOG.isTraceEnabled()) {
               LOG.trace("Prefetch start " + getPathOffsetEndStr(path, offset, 
end));
             }
+            Optional<BucketCache> bucketCacheOptional =
+              BucketCache.getBuckedCacheFromCacheConfig(cacheConf);
             // Don't use BlockIterator here, because it's designed to read 
load-on-open section.
             long onDiskSizeOfNextBlock = -1;
             while (offset < end) {
               if (Thread.interrupted()) {
                 break;
               }
+              // BucketCache can be persistent and resilient to restarts, so 
we check first if the
+              // block exists on its in-memory index, if so, we just update 
the offset and move on
+              // to the next block without actually going read all the way to 
the cache.
+              if (bucketCacheOptional.isPresent()) {
+                BucketCache cache = bucketCacheOptional.get();
+                BlockCacheKey cacheKey = new BlockCacheKey(name, offset);
+                BucketEntry entry = cache.getBackingMap().get(cacheKey);
+                if (entry != null) {
+                  cacheKey = new BlockCacheKey(name, offset);
+                  entry = cache.getBackingMap().get(cacheKey);
+                  if (entry == null) {
+                    LOG.debug("No cache key {}, we'll read and cache it", 
cacheKey);
+                  } else {
+                    offset += entry.getOnDiskSizeWithHeader();
+                    LOG.debug("Found cache key {}. Skipping prefetch, the 
block is already cached.",
+                      cacheKey);
+                    continue;
+                  }
+                } else {
+                  LOG.debug("No entry in the backing map for cache key {}", 
cacheKey);
+                }
+              }
               // Perhaps we got our block from cache? Unlikely as this may be, 
if it happens, then
               // the internal-to-hfileblock thread local which holds the 
overread that gets the
               // next header, will not have happened...so, pass in the 
onDiskSize gotten from the
@@ -77,12 +111,15 @@ public class HFilePreadReader extends HFileReaderImpl {
                 block.release();
               }
             }
+            BucketCache.getBuckedCacheFromCacheConfig(cacheConf)
+              .ifPresent(bc -> bc.fileCacheCompleted(path.getName()));
+
           } catch (IOException e) {
             // IOExceptions are probably due to region closes (relocation, 
etc.)
             if (LOG.isTraceEnabled()) {
               LOG.trace("Prefetch " + getPathOffsetEndStr(path, offset, end), 
e);
             }
-          } catch (Exception e) {
+          } catch (Throwable e) {
             // Other exceptions are interesting
             LOG.warn("Prefetch " + getPathOffsetEndStr(path, offset, end), e);
           } finally {
diff --git 
a/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/PrefetchExecutor.java
 
b/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/PrefetchExecutor.java
index d3064e066a1..02fbc12e85c 100644
--- 
a/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/PrefetchExecutor.java
+++ 
b/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/PrefetchExecutor.java
@@ -17,11 +17,6 @@
  */
 package org.apache.hadoop.hbase.io.hfile;
 
-import java.io.File;
-import java.io.FileInputStream;
-import java.io.FileOutputStream;
-import java.io.IOException;
-import java.util.HashMap;
 import java.util.Map;
 import java.util.concurrent.ConcurrentSkipListMap;
 import java.util.concurrent.Future;
@@ -42,8 +37,6 @@ import org.apache.yetus.audience.InterfaceAudience;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-import 
org.apache.hadoop.hbase.shaded.protobuf.generated.PersistentPrefetchProtos;
-
 @InterfaceAudience.Private
 public final class PrefetchExecutor {
 
@@ -51,16 +44,12 @@ public final class PrefetchExecutor {
 
   /** Futures for tracking block prefetch activity */
   private static final Map<Path, Future<?>> prefetchFutures = new 
ConcurrentSkipListMap<>();
-  /** Set of files for which prefetch is completed */
-  @edu.umd.cs.findbugs.annotations.SuppressWarnings(value = 
"MS_SHOULD_BE_FINAL")
-  private static HashMap<String, Boolean> prefetchCompleted = new HashMap<>();
   /** Executor pool shared among all HFiles for block prefetch */
   private static final ScheduledExecutorService prefetchExecutorPool;
   /** Delay before beginning prefetch */
   private static final int prefetchDelayMillis;
   /** Variation in prefetch delay times, to mitigate stampedes */
   private static final float prefetchDelayVariation;
-  static String prefetchedFileListPath;
   static {
     // Consider doing this on demand with a configuration passed in rather
     // than in a static initializer.
@@ -90,13 +79,6 @@ public final class PrefetchExecutor {
       + HConstants.HREGION_COMPACTIONDIR_NAME.replace(".", "\\.") + 
Path.SEPARATOR_CHAR + ")");
 
   public static void request(Path path, Runnable runnable) {
-    if (prefetchCompleted != null) {
-      if (isFilePrefetched(path.getName())) {
-        LOG.info(
-          "File has already been prefetched before the restart, so skipping 
prefetch : " + path);
-        return;
-      }
-    }
     if (!prefetchPathExclude.matcher(path.toString()).find()) {
       long delay;
       if (prefetchDelayMillis > 0) {
@@ -122,8 +104,9 @@ public final class PrefetchExecutor {
 
   public static void complete(Path path) {
     prefetchFutures.remove(path);
-    prefetchCompleted.put(path.getName(), true);
-    LOG.debug("Prefetch completed for {}", path.getName());
+    if (LOG.isDebugEnabled()) {
+      LOG.debug("Prefetch completed for {}", path.getName());
+    }
   }
 
   public static void cancel(Path path) {
@@ -134,8 +117,6 @@ public final class PrefetchExecutor {
       prefetchFutures.remove(path);
       LOG.debug("Prefetch cancelled for {}", path);
     }
-    LOG.debug("Removing filename from the prefetched persistence list: {}", 
path.getName());
-    removePrefetchedFileWhileEvict(path.getName());
   }
 
   public static boolean isCompleted(Path path) {
@@ -146,70 +127,6 @@ public final class PrefetchExecutor {
     return true;
   }
 
-  @edu.umd.cs.findbugs.annotations.SuppressWarnings(value = 
"OBL_UNSATISFIED_OBLIGATION",
-      justification = "false positive, try-with-resources ensures close is 
called.")
-  public static void persistToFile(String path) throws IOException {
-    prefetchedFileListPath = path;
-    if (prefetchedFileListPath == null) {
-      LOG.info("Exception while persisting prefetch!");
-      throw new IOException("Error persisting prefetched HFiles set!");
-    }
-    if (!prefetchCompleted.isEmpty()) {
-      try (FileOutputStream fos = new FileOutputStream(prefetchedFileListPath, 
false)) {
-        PrefetchProtoUtils.toPB(prefetchCompleted).writeDelimitedTo(fos);
-      }
-    }
-  }
-
-  public static void retrieveFromFile(String path) throws IOException {
-    prefetchedFileListPath = path;
-    File prefetchPersistenceFile = new File(prefetchedFileListPath);
-    if (!prefetchPersistenceFile.exists()) {
-      LOG.warn("Prefetch persistence file does not exist!");
-      return;
-    }
-    LOG.info("Retrieving from prefetch persistence file " + path);
-    assert (prefetchedFileListPath != null);
-    try (FileInputStream fis = deleteFileOnClose(prefetchPersistenceFile)) {
-      PersistentPrefetchProtos.PrefetchedHfileName proto =
-        PersistentPrefetchProtos.PrefetchedHfileName.parseDelimitedFrom(fis);
-      Map<String, Boolean> protoPrefetchedFilesMap = 
proto.getPrefetchedFilesMap();
-      prefetchCompleted.putAll(protoPrefetchedFilesMap);
-    }
-  }
-
-  private static FileInputStream deleteFileOnClose(final File file) throws 
IOException {
-    return new FileInputStream(file) {
-      private File myFile;
-
-      private FileInputStream init(File file) {
-        myFile = file;
-        return this;
-      }
-
-      @Override
-      public void close() throws IOException {
-        if (myFile == null) {
-          return;
-        }
-
-        super.close();
-        if (!myFile.delete()) {
-          throw new IOException("Failed deleting persistence file " + 
myFile.getAbsolutePath());
-        }
-        myFile = null;
-      }
-    }.init(file);
-  }
-
-  public static void removePrefetchedFileWhileEvict(String hfileName) {
-    prefetchCompleted.remove(hfileName);
-  }
-
-  public static boolean isFilePrefetched(String hfileName) {
-    return prefetchCompleted.containsKey(hfileName);
-  }
-
   private PrefetchExecutor() {
   }
 }
diff --git 
a/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/bucket/BucketCache.java
 
b/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/bucket/BucketCache.java
index 14c4c44ee16..bc5e7e7c9b9 100644
--- 
a/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/bucket/BucketCache.java
+++ 
b/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/bucket/BucketCache.java
@@ -18,7 +18,6 @@
 package org.apache.hadoop.hbase.io.hfile.bucket;
 
 import static 
org.apache.hadoop.hbase.io.hfile.CacheConfig.BUCKETCACHE_PERSIST_INTERVAL_KEY;
-import static 
org.apache.hadoop.hbase.io.hfile.CacheConfig.PREFETCH_PERSISTENCE_PATH_KEY;
 
 import java.io.File;
 import java.io.FileInputStream;
@@ -32,6 +31,7 @@ import java.util.Iterator;
 import java.util.List;
 import java.util.Map;
 import java.util.NavigableSet;
+import java.util.Optional;
 import java.util.PriorityQueue;
 import java.util.Set;
 import java.util.concurrent.ArrayBlockingQueue;
@@ -52,6 +52,7 @@ import java.util.function.Consumer;
 import java.util.function.Function;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.hbase.HBaseConfiguration;
+import org.apache.hadoop.hbase.HBaseIOException;
 import org.apache.hadoop.hbase.TableName;
 import org.apache.hadoop.hbase.client.Admin;
 import org.apache.hadoop.hbase.io.ByteBuffAllocator;
@@ -62,12 +63,13 @@ import org.apache.hadoop.hbase.io.hfile.BlockCacheKey;
 import org.apache.hadoop.hbase.io.hfile.BlockCacheUtil;
 import org.apache.hadoop.hbase.io.hfile.BlockPriority;
 import org.apache.hadoop.hbase.io.hfile.BlockType;
+import org.apache.hadoop.hbase.io.hfile.CacheConfig;
 import org.apache.hadoop.hbase.io.hfile.CacheStats;
 import org.apache.hadoop.hbase.io.hfile.Cacheable;
 import org.apache.hadoop.hbase.io.hfile.CachedBlock;
+import org.apache.hadoop.hbase.io.hfile.CombinedBlockCache;
 import org.apache.hadoop.hbase.io.hfile.HFileBlock;
 import org.apache.hadoop.hbase.io.hfile.HFileContext;
-import org.apache.hadoop.hbase.io.hfile.PrefetchExecutor;
 import org.apache.hadoop.hbase.nio.ByteBuff;
 import org.apache.hadoop.hbase.nio.RefCnt;
 import org.apache.hadoop.hbase.protobuf.ProtobufMagic;
@@ -143,8 +145,14 @@ public class BucketCache implements BlockCache, HeapSize {
 
   // Store the block in this map before writing it to cache
   transient final RAMCache ramCache;
+
   // In this map, store the block's meta data like offset, length
-  transient ConcurrentHashMap<BlockCacheKey, BucketEntry> backingMap;
+  transient Map<BlockCacheKey, BucketEntry> backingMap;
+
+  /** Set of files for which prefetch is completed */
+  final Map<String, Boolean> fullyCachedFiles = new ConcurrentHashMap<>();
+
+  private BucketCachePersister cachePersister;
 
   /**
    * Flag if the cache is enabled or not... We shut it off if there are IO 
errors for some time, so
@@ -177,9 +185,6 @@ public class BucketCache implements BlockCache, HeapSize {
   private static final int DEFAULT_CACHE_WAIT_TIME = 50;
 
   private final BucketCacheStats cacheStats = new BucketCacheStats();
-
-  /** BucketCache persister thread */
-  private BucketCachePersister cachePersister;
   private final String persistencePath;
   static AtomicBoolean isCacheInconsistent = new AtomicBoolean(false);
   private final long cacheCapacity;
@@ -239,8 +244,6 @@ public class BucketCache implements BlockCache, HeapSize {
   /** In-memory bucket size */
   private float memoryFactor;
 
-  private String prefetchedFileListPath;
-
   private long bucketcachePersistInterval;
 
   private static final String FILE_VERIFY_ALGORITHM =
@@ -293,7 +296,6 @@ public class BucketCache implements BlockCache, HeapSize {
     this.memoryFactor = conf.getFloat(MEMORY_FACTOR_CONFIG_NAME, 
DEFAULT_MEMORY_FACTOR);
     this.queueAdditionWaitTime =
       conf.getLong(QUEUE_ADDITION_WAIT_TIME, DEFAULT_QUEUE_ADDITION_WAIT_TIME);
-    this.prefetchedFileListPath = conf.get(PREFETCH_PERSISTENCE_PATH_KEY);
     this.bucketcachePersistInterval = 
conf.getLong(BUCKETCACHE_PERSIST_INTERVAL_KEY, 1000);
 
     sanityCheckConfigs();
@@ -320,11 +322,15 @@ public class BucketCache implements BlockCache, HeapSize {
 
     this.backingMap = new ConcurrentHashMap<>((int) blockNumCapacity);
 
-    if (ioEngine.isPersistent() && persistencePath != null) {
-      startBucketCachePersisterThread();
+    if (isCachePersistent()) {
+      if (ioEngine instanceof FileIOEngine) {
+        startBucketCachePersisterThread();
+      }
       try {
         retrieveFromFile(bucketSizes);
       } catch (IOException ioex) {
+        backingMap.clear();
+        fullyCachedFiles.clear();
         LOG.error("Can't restore from file[" + persistencePath + "] because of 
", ioex);
       }
     }
@@ -429,7 +435,7 @@ public class BucketCache implements BlockCache, HeapSize {
   }
 
   public boolean isCachePersistenceEnabled() {
-    return (prefetchedFileListPath != null) && (persistencePath != null);
+    return persistencePath != null;
   }
 
   /**
@@ -504,8 +510,8 @@ public class BucketCache implements BlockCache, HeapSize {
     }
     LOG.trace("Caching key={}, item={}", cacheKey, cachedItem);
     // Stuff the entry into the RAM cache so it can get drained to the 
persistent store
-    RAMQueueEntry re =
-      new RAMQueueEntry(cacheKey, cachedItem, accessCount.incrementAndGet(), 
inMemory);
+    RAMQueueEntry re = new RAMQueueEntry(cacheKey, cachedItem, 
accessCount.incrementAndGet(),
+      inMemory, isCachePersistent() && ioEngine instanceof FileIOEngine);
     /**
      * Don't use ramCache.put(cacheKey, re) here. because there may be a 
existing entry with same
      * key in ramCache, the heap size of bucket cache need to update if 
replacing entry from
@@ -589,6 +595,12 @@ public class BucketCache implements BlockCache, HeapSize {
           }
           return cachedBlock;
         }
+      } catch (HBaseIOException hioex) {
+        // When using file io engine persistent cache,
+        // the cache map state might differ from the actual cache. If we reach 
this block,
+        // we should remove the cache key entry from the backing map
+        backingMap.remove(key);
+        LOG.debug("Failed to fetch block for cache key: {}.", key, hioex);
       } catch (IOException ioex) {
         LOG.error("Failed reading block " + key + " from bucket cache", ioex);
         checkIOErrorIsTolerated();
@@ -616,13 +628,15 @@ public class BucketCache implements BlockCache, HeapSize {
       cacheStats.evicted(bucketEntry.getCachedTime(), cacheKey.isPrimary());
     }
     if (ioEngine.isPersistent()) {
-      if (prefetchedFileListPath != null) {
-        
PrefetchExecutor.removePrefetchedFileWhileEvict(cacheKey.getHfileName());
-      }
+      fullyCachedFiles.remove(cacheKey.getHfileName());
       setCacheInconsistent(true);
     }
   }
 
+  public void fileCacheCompleted(String fileName) {
+    fullyCachedFiles.put(fileName, true);
+  }
+
   /**
    * Free the {{@link BucketEntry} actually,which could only be invoked when 
the
    * {@link BucketEntry#refCnt} becoming 0.
@@ -1252,18 +1266,24 @@ public class BucketCache implements BlockCache, 
HeapSize {
   @edu.umd.cs.findbugs.annotations.SuppressWarnings(value = 
"OBL_UNSATISFIED_OBLIGATION",
       justification = "false positive, try-with-resources ensures close is 
called.")
   void persistToFile() throws IOException {
-    if (!ioEngine.isPersistent()) {
+    if (!isCachePersistent()) {
       throw new IOException("Attempt to persist non-persistent cache 
mappings!");
     }
-    try (FileOutputStream fos = new FileOutputStream(persistencePath, false)) {
+    File tempPersistencePath = new File(persistencePath + 
EnvironmentEdgeManager.currentTime());
+    try (FileOutputStream fos = new FileOutputStream(tempPersistencePath, 
false)) {
       fos.write(ProtobufMagic.PB_MAGIC);
       BucketProtoUtils.toPB(this).writeDelimitedTo(fos);
     }
-    if (prefetchedFileListPath != null) {
-      PrefetchExecutor.persistToFile(prefetchedFileListPath);
+    if (!tempPersistencePath.renameTo(new File(persistencePath))) {
+      LOG.warn("Failed to commit cache persistent file. We might lose cached 
blocks if "
+        + "RS crashes/restarts before we successfully checkpoint again.");
     }
   }
 
+  private boolean isCachePersistent() {
+    return ioEngine.isPersistent() && persistencePath != null;
+  }
+
   /**
    * @see #persistToFile()
    */
@@ -1273,9 +1293,6 @@ public class BucketCache implements BlockCache, HeapSize {
       return;
     }
     assert !cacheEnabled;
-    if (prefetchedFileListPath != null) {
-      PrefetchExecutor.retrieveFromFile(prefetchedFileListPath);
-    }
 
     try (FileInputStream in = deleteFileOnClose(persistenceFile)) {
       int pblen = ProtobufMagic.lengthOfPBMagic();
@@ -1358,16 +1375,37 @@ public class BucketCache implements BlockCache, 
HeapSize {
   }
 
   private void parsePB(BucketCacheProtos.BucketCacheEntry proto) throws 
IOException {
+    backingMap = BucketProtoUtils.fromPB(proto.getDeserializersMap(), 
proto.getBackingMap(),
+      this::createRecycler);
+    fullyCachedFiles.clear();
+    fullyCachedFiles.putAll(proto.getPrefetchedFilesMap());
     if (proto.hasChecksum()) {
-      ((PersistentIOEngine) 
ioEngine).verifyFileIntegrity(proto.getChecksum().toByteArray(),
-        algorithm);
+      try {
+        ((PersistentIOEngine) 
ioEngine).verifyFileIntegrity(proto.getChecksum().toByteArray(),
+          algorithm);
+      } catch (IOException e) {
+        LOG.warn("Checksum for cache file failed. "
+          + "We need to validate each cache key in the backing map. This may 
take some time...");
+        long startTime = EnvironmentEdgeManager.currentTime();
+        int totalKeysOriginally = backingMap.size();
+        for (Map.Entry<BlockCacheKey, BucketEntry> keyEntry : 
backingMap.entrySet()) {
+          try {
+            ((FileIOEngine) ioEngine).checkCacheTime(keyEntry.getValue());
+          } catch (IOException e1) {
+            LOG.debug("Check for key {} failed. Removing it from map.", 
keyEntry.getKey());
+            backingMap.remove(keyEntry.getKey());
+            fullyCachedFiles.remove(keyEntry.getKey().getHfileName());
+          }
+        }
+        LOG.info("Finished validating {} keys in the backing map. Recovered: 
{}. This took {}ms.",
+          totalKeysOriginally, backingMap.size(),
+          (EnvironmentEdgeManager.currentTime() - startTime));
+      }
     } else {
       // if has not checksum, it means the persistence file is old format
       LOG.info("Persistent file is old format, it does not support verifying 
file integrity!");
     }
     verifyCapacityAndClasses(proto.getCacheCapacity(), proto.getIoClass(), 
proto.getMapClass());
-    backingMap = BucketProtoUtils.fromPB(proto.getDeserializersMap(), 
proto.getBackingMap(),
-      this::createRecycler);
   }
 
   /**
@@ -1403,6 +1441,7 @@ public class BucketCache implements BlockCache, HeapSize {
     if (!ioEngine.isPersistent() || persistencePath == null) {
       // If persistent ioengine and a path, we will serialize out the 
backingMap.
       this.backingMap.clear();
+      this.fullyCachedFiles.clear();
     }
   }
 
@@ -1417,7 +1456,9 @@ public class BucketCache implements BlockCache, HeapSize {
     LOG.info("Shutdown bucket cache: IO persistent=" + ioEngine.isPersistent() 
+ "; path to write="
       + persistencePath);
     if (ioEngine.isPersistent() && persistencePath != null) {
-      cachePersister.interrupt();
+      if (cachePersister != null) {
+        cachePersister.interrupt();
+      }
       try {
         join();
         persistToFile();
@@ -1429,6 +1470,17 @@ public class BucketCache implements BlockCache, HeapSize 
{
     }
   }
 
+  /**
+   * Needed mostly for UTs that might run in the same VM and create different 
BucketCache instances
+   * on different UT methods.
+   */
+  @Override
+  protected void finalize() {
+    if (cachePersister != null && !cachePersister.isInterrupted()) {
+      cachePersister.interrupt();
+    }
+  }
+
   @Override
   public CacheStats getStats() {
     return cacheStats;
@@ -1485,7 +1537,7 @@ public class BucketCache implements BlockCache, HeapSize {
    */
   @Override
   public int evictBlocksByHfileName(String hfileName) {
-    PrefetchExecutor.removePrefetchedFileWhileEvict(hfileName);
+    this.fullyCachedFiles.remove(hfileName);
     Set<BlockCacheKey> keySet = blocksByHFile.subSet(new 
BlockCacheKey(hfileName, Long.MIN_VALUE),
       true, new BlockCacheKey(hfileName, Long.MAX_VALUE), true);
 
@@ -1556,12 +1608,15 @@ public class BucketCache implements BlockCache, 
HeapSize {
     private final Cacheable data;
     private long accessCounter;
     private boolean inMemory;
+    private boolean isCachePersistent;
 
-    RAMQueueEntry(BlockCacheKey bck, Cacheable data, long accessCounter, 
boolean inMemory) {
+    RAMQueueEntry(BlockCacheKey bck, Cacheable data, long accessCounter, 
boolean inMemory,
+      boolean isCachePersistent) {
       this.key = bck;
       this.data = data;
       this.accessCounter = accessCounter;
       this.inMemory = inMemory;
+      this.isCachePersistent = isCachePersistent;
     }
 
     public Cacheable getData() {
@@ -1591,12 +1646,19 @@ public class BucketCache implements BlockCache, 
HeapSize {
       if (len == 0) {
         return null;
       }
+      if (isCachePersistent && data instanceof HFileBlock) {
+        len += Long.BYTES; // we need to record the cache time for consistency 
check in case of
+                           // recovery
+      }
       long offset = alloc.allocateBlock(len);
       boolean succ = false;
       BucketEntry bucketEntry = null;
       try {
-        bucketEntry = new BucketEntry(offset, len, accessCounter, inMemory, 
createRecycler,
-          getByteBuffAllocator());
+        int diskSizeWithHeader = (data instanceof HFileBlock)
+          ? ((HFileBlock) data).getOnDiskSizeWithHeader()
+          : data.getSerializedLength();
+        bucketEntry = new BucketEntry(offset, len, diskSizeWithHeader, 
accessCounter, inMemory,
+          createRecycler, getByteBuffAllocator());
         bucketEntry.setDeserializerReference(data.getDeserializer());
         if (data instanceof HFileBlock) {
           // If an instance of HFileBlock, save on some allocations.
@@ -1604,7 +1666,16 @@ public class BucketCache implements BlockCache, HeapSize 
{
           ByteBuff sliceBuf = block.getBufferReadOnly();
           block.getMetaData(metaBuff);
           ioEngine.write(sliceBuf, offset);
-          ioEngine.write(metaBuff, offset + len - metaBuff.limit());
+          // adds the cache time after the block and metadata part
+          if (isCachePersistent) {
+            ioEngine.write(metaBuff, offset + len - metaBuff.limit() - 
Long.BYTES);
+            ByteBuffer buffer = ByteBuffer.allocate(Long.BYTES);
+            buffer.putLong(bucketEntry.getCachedTime());
+            buffer.rewind();
+            ioEngine.write(buffer, (offset + len - Long.BYTES));
+          } else {
+            ioEngine.write(metaBuff, offset + len - metaBuff.limit());
+          }
         } else {
           // Only used for testing.
           ByteBuffer bb = ByteBuffer.allocate(len);
@@ -1760,6 +1831,10 @@ public class BucketCache implements BlockCache, HeapSize 
{
     return memoryFactor;
   }
 
+  public String getPersistencePath() {
+    return persistencePath;
+  }
+
   /**
    * Wrapped the delegate ConcurrentMap with maintaining its block's reference 
count.
    */
@@ -1837,4 +1912,28 @@ public class BucketCache implements BlockCache, HeapSize 
{
       }
     }
   }
+
+  public Map<BlockCacheKey, BucketEntry> getBackingMap() {
+    return backingMap;
+  }
+
+  public Map<String, Boolean> getFullyCachedFiles() {
+    return fullyCachedFiles;
+  }
+
+  public static Optional<BucketCache> 
getBuckedCacheFromCacheConfig(CacheConfig cacheConf) {
+    if (cacheConf.getBlockCache().isPresent()) {
+      BlockCache bc = cacheConf.getBlockCache().get();
+      if (bc instanceof CombinedBlockCache) {
+        BlockCache l2 = ((CombinedBlockCache) bc).getSecondLevelCache();
+        if (l2 instanceof BucketCache) {
+          return Optional.of((BucketCache) l2);
+        }
+      } else if (bc instanceof BucketCache) {
+        return Optional.of((BucketCache) bc);
+      }
+    }
+    return Optional.empty();
+  }
+
 }
diff --git 
a/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/bucket/BucketEntry.java
 
b/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/bucket/BucketEntry.java
index a04a32bfe64..c93dac8a572 100644
--- 
a/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/bucket/BucketEntry.java
+++ 
b/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/bucket/BucketEntry.java
@@ -43,13 +43,15 @@ import org.apache.yetus.audience.InterfaceAudience;
  * bytes gives us 256TB or so.
  */
 @InterfaceAudience.Private
-class BucketEntry implements HBaseReferenceCounted {
+public class BucketEntry implements HBaseReferenceCounted {
   // access counter comparator, descending order
   static final Comparator<BucketEntry> COMPARATOR =
     Comparator.comparingLong(BucketEntry::getAccessCounter).reversed();
 
   private int offsetBase;
   private int length;
+
+  private int onDiskSizeWithHeader;
   private byte offset1;
 
   /**
@@ -91,24 +93,32 @@ class BucketEntry implements HBaseReferenceCounted {
   /**
    * Time this block was cached. Presumes we are created just before we are 
added to the cache.
    */
-  private final long cachedTime = System.nanoTime();
+  private long cachedTime = System.nanoTime();
 
   /**
    * @param createRecycler used to free this {@link BucketEntry} when {@link 
BucketEntry#refCnt}
    *                       becoming 0. NOTICE that {@link 
ByteBuffAllocator#NONE} could only be used
    *                       for test.
    */
-  BucketEntry(long offset, int length, long accessCounter, boolean inMemory,
-    Function<BucketEntry, Recycler> createRecycler, ByteBuffAllocator 
allocator) {
+  BucketEntry(long offset, int length, int onDiskSizeWithHeader, long 
accessCounter,
+    boolean inMemory, Function<BucketEntry, Recycler> createRecycler, 
ByteBuffAllocator allocator) {
+    this(offset, length, onDiskSizeWithHeader, accessCounter, 
System.nanoTime(), inMemory,
+      createRecycler, allocator);
+  }
+
+  BucketEntry(long offset, int length, int onDiskSizeWithHeader, long 
accessCounter,
+    long cachedTime, boolean inMemory, Function<BucketEntry, Recycler> 
createRecycler,
+    ByteBuffAllocator allocator) {
     if (createRecycler == null) {
       throw new IllegalArgumentException("createRecycler could not be null!");
     }
     setOffset(offset);
     this.length = length;
+    this.onDiskSizeWithHeader = onDiskSizeWithHeader;
     this.accessCounter = accessCounter;
+    this.cachedTime = cachedTime;
     this.priority = inMemory ? BlockPriority.MEMORY : BlockPriority.MULTI;
     this.refCnt = RefCnt.create(createRecycler.apply(this));
-
     this.markedAsEvicted = new AtomicBoolean(false);
     this.allocator = allocator;
   }
@@ -159,10 +169,14 @@ class BucketEntry implements HBaseReferenceCounted {
     return this.priority;
   }
 
-  long getCachedTime() {
+  public long getCachedTime() {
     return cachedTime;
   }
 
+  public int getOnDiskSizeWithHeader() {
+    return onDiskSizeWithHeader;
+  }
+
   /**
    * The {@link BucketCache} will try to release its reference to this 
BucketEntry many times. we
    * must make sure the idempotent, otherwise it'll decrease the RPC's 
reference count in advance,
@@ -239,7 +253,7 @@ class BucketEntry implements HBaseReferenceCounted {
    * also release its refCnt (case.1 will do this) and no other rpc reference, 
then it will free the
    * area in bucketAllocator. <br>
    * 3.evict those block without any rpc reference if cache size exceeded. 
we'll only free those
-   * blocks with zero rpc reference count, as the {@link 
BucketEntry#markStaleAsEvicted()} do.
+   * blocks with zero rpc reference count.
    * @return true to indicate we've decreased to zero and do the de-allocation.
    */
   @Override
diff --git 
a/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/bucket/BucketProtoUtils.java
 
b/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/bucket/BucketProtoUtils.java
index ff4e90b8865..8830e5d3255 100644
--- 
a/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/bucket/BucketProtoUtils.java
+++ 
b/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/bucket/BucketProtoUtils.java
@@ -45,6 +45,7 @@ final class BucketProtoUtils {
       .setIoClass(cache.ioEngine.getClass().getName())
       .setMapClass(cache.backingMap.getClass().getName())
       .putAllDeserializers(CacheableDeserializerIdManager.save())
+      .putAllPrefetchedFiles(cache.fullyCachedFiles)
       .setBackingMap(BucketProtoUtils.toPB(cache.backingMap))
       .setChecksum(ByteString
         .copyFrom(((PersistentIOEngine) 
cache.ioEngine).calculateChecksum(cache.getAlgorithm())))
@@ -99,8 +100,10 @@ final class BucketProtoUtils {
 
   private static BucketCacheProtos.BucketEntry toPB(BucketEntry entry) {
     return BucketCacheProtos.BucketEntry.newBuilder().setOffset(entry.offset())
-      
.setLength(entry.getLength()).setDeserialiserIndex(entry.deserializerIndex)
-      
.setAccessCounter(entry.getAccessCounter()).setPriority(toPB(entry.getPriority())).build();
+      .setCachedTime(entry.getCachedTime()).setLength(entry.getLength())
+      .setDiskSizeWithHeader(entry.getOnDiskSizeWithHeader())
+      
.setDeserialiserIndex(entry.deserializerIndex).setAccessCounter(entry.getAccessCounter())
+      .setPriority(toPB(entry.getPriority())).build();
   }
 
   private static BucketCacheProtos.BlockPriority toPB(BlockPriority p) {
@@ -128,7 +131,8 @@ final class BucketProtoUtils {
       // TODO:We use ByteBuffAllocator.HEAP here, because we could not get the 
ByteBuffAllocator
       // which created by RpcServer elegantly.
       BucketEntry value = new BucketEntry(protoValue.getOffset(), 
protoValue.getLength(),
-        protoValue.getAccessCounter(),
+        protoValue.getDiskSizeWithHeader(), protoValue.getAccessCounter(),
+        protoValue.getCachedTime(),
         protoValue.getPriority() == BucketCacheProtos.BlockPriority.memory, 
createRecycler,
         ByteBuffAllocator.HEAP);
       // This is the deserializer that we stored
diff --git 
a/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/bucket/FileIOEngine.java
 
b/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/bucket/FileIOEngine.java
index 370343b1b25..38f9db04b6d 100644
--- 
a/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/bucket/FileIOEngine.java
+++ 
b/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/bucket/FileIOEngine.java
@@ -26,6 +26,7 @@ import java.nio.channels.ClosedChannelException;
 import java.nio.channels.FileChannel;
 import java.util.Arrays;
 import java.util.concurrent.locks.ReentrantLock;
+import org.apache.hadoop.hbase.HBaseIOException;
 import org.apache.hadoop.hbase.exceptions.IllegalArgumentIOException;
 import org.apache.hadoop.hbase.io.hfile.Cacheable;
 import org.apache.hadoop.hbase.nio.ByteBuff;
@@ -49,6 +50,7 @@ public class FileIOEngine extends PersistentIOEngine {
 
   private final long sizePerFile;
   private final long capacity;
+  private boolean maintainPersistence;
 
   private FileReadAccessor readAccessor = new FileReadAccessor();
   private FileWriteAccessor writeAccessor = new FileWriteAccessor();
@@ -59,6 +61,7 @@ public class FileIOEngine extends PersistentIOEngine {
     this.sizePerFile = capacity / filePaths.length;
     this.capacity = this.sizePerFile * filePaths.length;
     this.fileChannels = new FileChannel[filePaths.length];
+    this.maintainPersistence = maintainPersistence;
     if (!maintainPersistence) {
       for (String filePath : filePaths) {
         File file = new File(filePath);
@@ -145,10 +148,42 @@ public class FileIOEngine extends PersistentIOEngine {
         throw ioe;
       }
     }
-    dstBuff.rewind();
+    if (maintainPersistence) {
+      dstBuff.position(length - Long.BYTES);
+      long cachedNanoTime = dstBuff.getLong();
+      if (be.getCachedTime() != cachedNanoTime) {
+        dstBuff.release();
+        throw new HBaseIOException("The cached time recorded within the cached 
block differs "
+          + "from its bucket entry, so it might not be the same.");
+      }
+      dstBuff.rewind();
+      dstBuff.limit(length - Long.BYTES);
+      dstBuff = dstBuff.slice();
+    } else {
+      dstBuff.rewind();
+    }
     return be.wrapAsCacheable(dstBuff);
   }
 
+  void checkCacheTime(BucketEntry be) throws IOException {
+    long offset = be.offset();
+    int length = be.getLength();
+    ByteBuff dstBuff = be.allocator.allocate(Long.BYTES);
+    try {
+      accessFile(readAccessor, dstBuff, (offset + length - Long.BYTES));
+    } catch (IOException ioe) {
+      dstBuff.release();
+      throw ioe;
+    }
+    dstBuff.rewind();
+    long cachedNanoTime = dstBuff.getLong();
+    if (be.getCachedTime() != cachedNanoTime) {
+      dstBuff.release();
+      throw new HBaseIOException("The cached time recorded within the cached 
block differs "
+        + "from its bucket entry, so it might not be the same.");
+    }
+  }
+
   void closeFileChannels() {
     for (FileChannel fileChannel : fileChannels) {
       try {
diff --git 
a/hbase-server/src/test/java/org/apache/hadoop/hbase/io/hfile/TestBlockEvictionOnRegionMovement.java
 
b/hbase-server/src/test/java/org/apache/hadoop/hbase/io/hfile/TestBlockEvictionOnRegionMovement.java
index 66b2ca73ded..eb3e3cc61f4 100644
--- 
a/hbase-server/src/test/java/org/apache/hadoop/hbase/io/hfile/TestBlockEvictionOnRegionMovement.java
+++ 
b/hbase-server/src/test/java/org/apache/hadoop/hbase/io/hfile/TestBlockEvictionOnRegionMovement.java
@@ -79,7 +79,6 @@ public class TestBlockEvictionOnRegionMovement {
     conf.set(BUCKET_CACHE_IOENGINE_KEY, "file:" + testDir + "/bucket.cache");
     conf.setInt("hbase.bucketcache.size", 400);
     conf.set("hbase.bucketcache.persistent.path", testDir + 
"/bucket.persistence");
-    conf.set(CacheConfig.PREFETCH_PERSISTENCE_PATH_KEY, testDir + 
"/prefetch.persistence");
     conf.setLong(CacheConfig.BUCKETCACHE_PERSIST_INTERVAL_KEY, 100);
     conf.setBoolean(CacheConfig.EVICT_BLOCKS_ON_CLOSE_KEY, true);
     zkCluster = TEST_UTIL.startMiniZKCluster();
diff --git 
a/hbase-server/src/test/java/org/apache/hadoop/hbase/io/hfile/TestPrefetchRSClose.java
 
b/hbase-server/src/test/java/org/apache/hadoop/hbase/io/hfile/TestPrefetchRSClose.java
index b10186996ed..64db9158333 100644
--- 
a/hbase-server/src/test/java/org/apache/hadoop/hbase/io/hfile/TestPrefetchRSClose.java
+++ 
b/hbase-server/src/test/java/org/apache/hadoop/hbase/io/hfile/TestPrefetchRSClose.java
@@ -75,7 +75,6 @@ public class TestPrefetchRSClose {
     conf.set(BUCKET_CACHE_IOENGINE_KEY, "file:" + testDir + "/bucket.cache");
     conf.setInt("hbase.bucketcache.size", 400);
     conf.set("hbase.bucketcache.persistent.path", testDir + 
"/bucket.persistence");
-    conf.set(CacheConfig.PREFETCH_PERSISTENCE_PATH_KEY, testDir + 
"/prefetch.persistence");
     zkCluster = TEST_UTIL.startMiniZKCluster();
     cluster = TEST_UTIL.startMiniHBaseCluster(option);
     assertEquals(2, cluster.getRegionServerThreads().size());
@@ -114,18 +113,15 @@ public class TestPrefetchRSClose {
     // Default interval for cache persistence is 1000ms. So after 1000ms, both 
the persistence files
     // should exist.
     assertTrue(new File(testDir + "/bucket.persistence").exists());
-    assertTrue(new File(testDir + "/prefetch.persistence").exists());
 
     // Stop the RS
     cluster.stopRegionServer(0);
     LOG.info("Stopped Region Server 0.");
     Thread.sleep(1000);
     assertTrue(new File(testDir + "/bucket.persistence").exists());
-    assertTrue(new File(testDir + "/prefetch.persistence").exists());
 
     // Start the RS and validate
     cluster.startRegionServer();
-    assertFalse(new File(testDir + "/prefetch.persistence").exists());
     assertFalse(new File(testDir + "/bucket.persistence").exists());
   }
 
diff --git 
a/hbase-server/src/test/java/org/apache/hadoop/hbase/io/hfile/TestPrefetchWithBucketCache.java
 
b/hbase-server/src/test/java/org/apache/hadoop/hbase/io/hfile/TestPrefetchWithBucketCache.java
new file mode 100644
index 00000000000..e4330308243
--- /dev/null
+++ 
b/hbase-server/src/test/java/org/apache/hadoop/hbase/io/hfile/TestPrefetchWithBucketCache.java
@@ -0,0 +1,211 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.io.hfile;
+
+import static org.apache.hadoop.hbase.HConstants.BUCKET_CACHE_IOENGINE_KEY;
+import static org.apache.hadoop.hbase.HConstants.BUCKET_CACHE_SIZE_KEY;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.fail;
+
+import java.io.File;
+import java.io.IOException;
+import java.util.Map;
+import java.util.Random;
+import java.util.concurrent.ThreadLocalRandom;
+import java.util.function.BiConsumer;
+import java.util.function.BiFunction;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hbase.HBaseClassTestRule;
+import org.apache.hadoop.hbase.HBaseTestingUtil;
+import org.apache.hadoop.hbase.KeyValue;
+import org.apache.hadoop.hbase.Waiter;
+import org.apache.hadoop.hbase.fs.HFileSystem;
+import org.apache.hadoop.hbase.io.hfile.bucket.BucketCache;
+import org.apache.hadoop.hbase.io.hfile.bucket.BucketEntry;
+import org.apache.hadoop.hbase.regionserver.StoreFileWriter;
+import org.apache.hadoop.hbase.testclassification.IOTests;
+import org.apache.hadoop.hbase.testclassification.MediumTests;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.ClassRule;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.experimental.categories.Category;
+import org.junit.rules.TestName;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import org.apache.hbase.thirdparty.com.google.common.collect.ImmutableMap;
+
+@Category({ IOTests.class, MediumTests.class })
+public class TestPrefetchWithBucketCache {
+
+  private static final Logger LOG = 
LoggerFactory.getLogger(TestPrefetchWithBucketCache.class);
+
+  @ClassRule
+  public static final HBaseClassTestRule CLASS_RULE =
+    HBaseClassTestRule.forClass(TestPrefetchWithBucketCache.class);
+
+  @Rule
+  public TestName name = new TestName();
+
+  private static final HBaseTestingUtil TEST_UTIL = new HBaseTestingUtil();
+
+  private static final int NUM_VALID_KEY_TYPES = KeyValue.Type.values().length 
- 2;
+  private static final int DATA_BLOCK_SIZE = 2048;
+  private static final int NUM_KV = 100;
+
+  private Configuration conf;
+  private CacheConfig cacheConf;
+  private FileSystem fs;
+  private BlockCache blockCache;
+
+  @Before
+  public void setUp() throws IOException {
+    conf = TEST_UTIL.getConfiguration();
+    conf.setBoolean(CacheConfig.PREFETCH_BLOCKS_ON_OPEN_KEY, true);
+    fs = HFileSystem.get(conf);
+    File testDir = new File(name.getMethodName());
+    testDir.mkdir();
+    conf.set(BUCKET_CACHE_IOENGINE_KEY, "file:/" + testDir.getAbsolutePath() + 
"/bucket.cache");
+    conf.setLong(BUCKET_CACHE_SIZE_KEY, 200);
+    blockCache = BlockCacheFactory.createBlockCache(conf);
+    cacheConf = new CacheConfig(conf, blockCache);
+  }
+
+  @After
+  public void tearDown() {
+    File cacheFile = new File(name.getMethodName() + "/bucket.cache");
+    File dir = new File(name.getMethodName());
+    cacheFile.delete();
+    dir.delete();
+  }
+
+  @Test
+  public void testPrefetchDoesntOverwork() throws Exception {
+    Path storeFile = writeStoreFile("TestPrefetchDoesntOverwork");
+    // Prefetches the file blocks
+    LOG.debug("First read should prefetch the blocks.");
+    readStoreFile(storeFile);
+    BucketCache bc = 
BucketCache.getBuckedCacheFromCacheConfig(cacheConf).get();
+    // Our file should have 6 DATA blocks. We should wait for all of them to 
be cached
+    Waiter.waitFor(conf, 300, () -> bc.getBackingMap().size() == 6);
+    Map<BlockCacheKey, BucketEntry> snapshot = 
ImmutableMap.copyOf(bc.getBackingMap());
+    // Reads file again and check we are not prefetching it again
+    LOG.debug("Second read, no prefetch should happen here.");
+    readStoreFile(storeFile);
+    // Makes sure the cache hasn't changed
+    snapshot.entrySet().forEach(e -> {
+      BucketEntry entry = bc.getBackingMap().get(e.getKey());
+      assertNotNull(entry);
+      assertEquals(e.getValue().getCachedTime(), entry.getCachedTime());
+    });
+    // forcibly removes first block from the bc backing map, in order to cause 
it to be cached again
+    BlockCacheKey key = snapshot.keySet().stream().findFirst().get();
+    LOG.debug("removing block {}", key);
+    bc.getBackingMap().remove(key);
+    bc.getFullyCachedFiles().remove(storeFile.getName());
+    assertTrue(snapshot.size() > bc.getBackingMap().size());
+    LOG.debug("Third read should prefetch again, as we removed one block for 
the file.");
+    readStoreFile(storeFile);
+    Waiter.waitFor(conf, 300, () -> snapshot.size() == 
bc.getBackingMap().size());
+    assertTrue(snapshot.get(key).getCachedTime() < 
bc.getBackingMap().get(key).getCachedTime());
+  }
+
+  private void readStoreFile(Path storeFilePath) throws Exception {
+    readStoreFile(storeFilePath, (r, o) -> {
+      HFileBlock block = null;
+      try {
+        block = r.readBlock(o, -1, false, true, false, true, null, null);
+      } catch (IOException e) {
+        fail(e.getMessage());
+      }
+      return block;
+    }, (key, block) -> {
+      boolean isCached = blockCache.getBlock(key, true, false, true) != null;
+      if (
+        block.getBlockType() == BlockType.DATA || block.getBlockType() == 
BlockType.ROOT_INDEX
+          || block.getBlockType() == BlockType.INTERMEDIATE_INDEX
+      ) {
+        assertTrue(isCached);
+      }
+    });
+  }
+
+  private void readStoreFile(Path storeFilePath,
+    BiFunction<HFile.Reader, Long, HFileBlock> readFunction,
+    BiConsumer<BlockCacheKey, HFileBlock> validationFunction) throws Exception 
{
+    // Open the file
+    HFile.Reader reader = HFile.createReader(fs, storeFilePath, cacheConf, 
true, conf);
+
+    while (!reader.prefetchComplete()) {
+      // Sleep for a bit
+      Thread.sleep(1000);
+    }
+    long offset = 0;
+    while (offset < reader.getTrailer().getLoadOnOpenDataOffset()) {
+      HFileBlock block = readFunction.apply(reader, offset);
+      BlockCacheKey blockCacheKey = new BlockCacheKey(reader.getName(), 
offset);
+      validationFunction.accept(blockCacheKey, block);
+      offset += block.getOnDiskSizeWithHeader();
+    }
+  }
+
+  private Path writeStoreFile(String fname) throws IOException {
+    HFileContext meta = new 
HFileContextBuilder().withBlockSize(DATA_BLOCK_SIZE).build();
+    return writeStoreFile(fname, meta);
+  }
+
+  private Path writeStoreFile(String fname, HFileContext context) throws 
IOException {
+    Path storeFileParentDir = new Path(TEST_UTIL.getDataTestDir(), fname);
+    StoreFileWriter sfw = new StoreFileWriter.Builder(conf, cacheConf, fs)
+      .withOutputDir(storeFileParentDir).withFileContext(context).build();
+    Random rand = ThreadLocalRandom.current();
+    final int rowLen = 32;
+    for (int i = 0; i < NUM_KV; ++i) {
+      byte[] k = RandomKeyValueUtil.randomOrderedKey(rand, i);
+      byte[] v = RandomKeyValueUtil.randomValue(rand);
+      int cfLen = rand.nextInt(k.length - rowLen + 1);
+      KeyValue kv = new KeyValue(k, 0, rowLen, k, rowLen, cfLen, k, rowLen + 
cfLen,
+        k.length - rowLen - cfLen, rand.nextLong(), generateKeyType(rand), v, 
0, v.length);
+      sfw.append(kv);
+    }
+
+    sfw.close();
+    return sfw.getPath();
+  }
+
+  public static KeyValue.Type generateKeyType(Random rand) {
+    if (rand.nextBoolean()) {
+      // Let's make half of KVs puts.
+      return KeyValue.Type.Put;
+    } else {
+      KeyValue.Type keyType = KeyValue.Type.values()[1 + 
rand.nextInt(NUM_VALID_KEY_TYPES)];
+      if (keyType == KeyValue.Type.Minimum || keyType == 
KeyValue.Type.Maximum) {
+        throw new RuntimeException("Generated an invalid key type: " + keyType 
+ ". "
+          + "Probably the layout of KeyValue.Type has changed.");
+      }
+      return keyType;
+    }
+  }
+
+}
diff --git 
a/hbase-server/src/test/java/org/apache/hadoop/hbase/io/hfile/bucket/TestBucketCache.java
 
b/hbase-server/src/test/java/org/apache/hadoop/hbase/io/hfile/bucket/TestBucketCache.java
index ad381a665c3..0cbafedc7c5 100644
--- 
a/hbase-server/src/test/java/org/apache/hadoop/hbase/io/hfile/bucket/TestBucketCache.java
+++ 
b/hbase-server/src/test/java/org/apache/hadoop/hbase/io/hfile/bucket/TestBucketCache.java
@@ -57,6 +57,7 @@ import 
org.apache.hadoop.hbase.io.hfile.bucket.BucketCache.RAMQueueEntry;
 import org.apache.hadoop.hbase.nio.ByteBuff;
 import org.apache.hadoop.hbase.testclassification.IOTests;
 import org.apache.hadoop.hbase.testclassification.LargeTests;
+import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
 import org.apache.hadoop.hbase.util.Pair;
 import org.apache.hadoop.hbase.util.Threads;
 import org.junit.After;
@@ -298,12 +299,17 @@ public class TestBucketCache {
     testRetrievalUtils(testDir, ioEngineName);
     int[] smallBucketSizes = new int[] { 3 * 1024, 5 * 1024 };
     String persistencePath = testDir + "/bucket.persistence";
-    BucketCache bucketCache = new BucketCache(ioEngineName, capacitySize, 
constructedBlockSize,
-      smallBucketSizes, writeThreads, writerQLen, persistencePath);
-    assertFalse(new File(persistencePath).exists());
-    assertEquals(0, bucketCache.getAllocator().getUsedSize());
-    assertEquals(0, bucketCache.backingMap.size());
-    HBASE_TESTING_UTILITY.cleanupTestDir();
+    BucketCache bucketCache = null;
+    try {
+      bucketCache = new BucketCache(ioEngineName, capacitySize, 
constructedBlockSize,
+        smallBucketSizes, writeThreads, writerQLen, persistencePath);
+      assertFalse(new File(persistencePath).exists());
+      assertEquals(0, bucketCache.getAllocator().getUsedSize());
+      assertEquals(0, bucketCache.backingMap.size());
+    } finally {
+      bucketCache.shutdown();
+      HBASE_TESTING_UTILITY.cleanupTestDir();
+    }
   }
 
   @Test
@@ -319,21 +325,28 @@ public class TestBucketCache {
     final String ioEngineName = "pmem:" + testDir + "/bucket.cache";
     testRetrievalUtils(testDir, ioEngineName);
     int[] smallBucketSizes = new int[] { 3 * 1024, 5 * 1024 };
-    String persistencePath = testDir + "/bucket.persistence";
-    BucketCache bucketCache = new BucketCache(ioEngineName, capacitySize, 
constructedBlockSize,
-      smallBucketSizes, writeThreads, writerQLen, persistencePath);
-    assertFalse(new File(persistencePath).exists());
-    assertEquals(0, bucketCache.getAllocator().getUsedSize());
-    assertEquals(0, bucketCache.backingMap.size());
-    HBASE_TESTING_UTILITY.cleanupTestDir();
+    String persistencePath = testDir + "/bucket.persistence" + 
EnvironmentEdgeManager.currentTime();
+    BucketCache bucketCache = null;
+    try {
+      bucketCache = new BucketCache(ioEngineName, capacitySize, 
constructedBlockSize,
+        smallBucketSizes, writeThreads, writerQLen, persistencePath);
+      assertFalse(new File(persistencePath).exists());
+      assertEquals(0, bucketCache.getAllocator().getUsedSize());
+      assertEquals(0, bucketCache.backingMap.size());
+    } finally {
+      bucketCache.shutdown();
+      HBASE_TESTING_UTILITY.cleanupTestDir();
+    }
   }
 
   private void testRetrievalUtils(Path testDir, String ioEngineName)
     throws IOException, InterruptedException {
-    final String persistencePath = testDir + "/bucket.persistence";
-    BucketCache bucketCache = new BucketCache(ioEngineName, capacitySize, 
constructedBlockSize,
-      constructedBlockSizes, writeThreads, writerQLen, persistencePath);
+    final String persistencePath =
+      testDir + "/bucket.persistence" + EnvironmentEdgeManager.currentTime();
+    BucketCache bucketCache = null;
     try {
+      bucketCache = new BucketCache(ioEngineName, capacitySize, 
constructedBlockSize,
+        constructedBlockSizes, writeThreads, writerQLen, persistencePath);
       long usedSize = bucketCache.getAllocator().getUsedSize();
       assertEquals(0, usedSize);
       HFileBlockPair[] blocks = 
CacheTestUtils.generateHFileBlocks(constructedBlockSize, 1);
@@ -353,7 +366,9 @@ public class TestBucketCache {
       assertFalse(new File(persistencePath).exists());
       assertEquals(usedSize, bucketCache.getAllocator().getUsedSize());
     } finally {
-      bucketCache.shutdown();
+      if (bucketCache != null) {
+        bucketCache.shutdown();
+      }
     }
     assertTrue(new File(persistencePath).exists());
   }
@@ -382,12 +397,17 @@ public class TestBucketCache {
     testRetrievalUtils(testDirInitial, ioEngineName);
     int[] smallBucketSizes = new int[] { 3 * 1024, 5 * 1024 };
     String persistencePath = testDirInitial + "/bucket.persistence";
-    BucketCache bucketCache = new BucketCache(ioEngineName, capacitySize, 
constructedBlockSize,
-      smallBucketSizes, writeThreads, writerQLen, persistencePath);
-    assertFalse(new File(persistencePath).exists());
-    assertEquals(0, bucketCache.getAllocator().getUsedSize());
-    assertEquals(0, bucketCache.backingMap.size());
-    HBASE_TESTING_UTILITY.cleanupTestDir();
+    BucketCache bucketCache = null;
+    try {
+      bucketCache = new BucketCache(ioEngineName, capacitySize, 
constructedBlockSize,
+        smallBucketSizes, writeThreads, writerQLen, persistencePath);
+      assertFalse(new File(persistencePath).exists());
+      assertEquals(0, bucketCache.getAllocator().getUsedSize());
+      assertEquals(0, bucketCache.backingMap.size());
+    } finally {
+      bucketCache.shutdown();
+      HBASE_TESTING_UTILITY.cleanupTestDir();
+    }
   }
 
   @Test
@@ -572,7 +592,7 @@ public class TestBucketCache {
     // This number is picked because it produces negative output if the values 
isn't ensured to be
     // positive. See HBASE-18757 for more information.
     long testValue = 549888460800L;
-    BucketEntry bucketEntry = new BucketEntry(testValue, 10, 10L, true, 
(entry) -> {
+    BucketEntry bucketEntry = new BucketEntry(testValue, 10, 10, 10L, true, 
(entry) -> {
       return ByteBuffAllocator.NONE;
     }, ByteBuffAllocator.HEAP);
     assertEquals(testValue, bucketEntry.offset());
@@ -701,8 +721,8 @@ public class TestBucketCache {
       HFileBlock.FILL_HEADER, -1, 52, -1, meta, ByteBuffAllocator.HEAP);
     HFileBlock blk2 = new HFileBlock(BlockType.DATA, size, size, -1, 
ByteBuff.wrap(buf),
       HFileBlock.FILL_HEADER, -1, -1, -1, meta, ByteBuffAllocator.HEAP);
-    RAMQueueEntry re1 = new RAMQueueEntry(key1, blk1, 1, false);
-    RAMQueueEntry re2 = new RAMQueueEntry(key1, blk2, 1, false);
+    RAMQueueEntry re1 = new RAMQueueEntry(key1, blk1, 1, false, false);
+    RAMQueueEntry re2 = new RAMQueueEntry(key1, blk2, 1, false, false);
 
     assertFalse(cache.containsKey(key1));
     assertNull(cache.putIfAbsent(key1, re1));
@@ -749,7 +769,7 @@ public class TestBucketCache {
     BucketAllocator allocator = new BucketAllocator(availableSpace, null);
 
     BlockCacheKey key = new BlockCacheKey("dummy", 1L);
-    RAMQueueEntry re = new RAMQueueEntry(key, block, 1, true);
+    RAMQueueEntry re = new RAMQueueEntry(key, block, 1, true, false);
 
     Assert.assertEquals(0, allocator.getUsedSize());
     try {
@@ -768,13 +788,14 @@ public class TestBucketCache {
    */
   @Test
   public void testFreeBucketEntryRestoredFromFile() throws Exception {
+    BucketCache bucketCache = null;
     try {
       final Path dataTestDir = createAndGetTestDir();
 
       String ioEngineName = "file:" + dataTestDir + "/bucketNoRecycler.cache";
       String persistencePath = dataTestDir + "/bucketNoRecycler.persistence";
 
-      BucketCache bucketCache = new BucketCache(ioEngineName, capacitySize, 
constructedBlockSize,
+      bucketCache = new BucketCache(ioEngineName, capacitySize, 
constructedBlockSize,
         constructedBlockSizes, writeThreads, writerQLen, persistencePath);
       long usedByteSize = bucketCache.getAllocator().getUsedSize();
       assertEquals(0, usedByteSize);
@@ -809,19 +830,21 @@ public class TestBucketCache {
       assertEquals(0, bucketCache.getAllocator().getUsedSize());
       assertEquals(0, bucketCache.backingMap.size());
     } finally {
+      bucketCache.shutdown();
       HBASE_TESTING_UTILITY.cleanupTestDir();
     }
   }
 
   @Test
   public void testBlockAdditionWaitWhenCache() throws Exception {
+    BucketCache bucketCache = null;
     try {
       final Path dataTestDir = createAndGetTestDir();
 
       String ioEngineName = "file:" + dataTestDir + "/bucketNoRecycler.cache";
       String persistencePath = dataTestDir + "/bucketNoRecycler.persistence";
 
-      BucketCache bucketCache = new BucketCache(ioEngineName, capacitySize, 
constructedBlockSize,
+      bucketCache = new BucketCache(ioEngineName, capacitySize, 
constructedBlockSize,
         constructedBlockSizes, 1, 1, persistencePath);
       long usedByteSize = bucketCache.getAllocator().getUsedSize();
       assertEquals(0, usedByteSize);
@@ -864,6 +887,9 @@ public class TestBucketCache {
       assertEquals(0, bucketCache.getAllocator().getUsedSize());
       assertEquals(0, bucketCache.backingMap.size());
     } finally {
+      if (bucketCache != null) {
+        bucketCache.shutdown();
+      }
       HBASE_TESTING_UTILITY.cleanupTestDir();
     }
   }
diff --git 
a/hbase-server/src/test/java/org/apache/hadoop/hbase/io/hfile/bucket/TestBucketCachePersister.java
 
b/hbase-server/src/test/java/org/apache/hadoop/hbase/io/hfile/bucket/TestBucketCachePersister.java
index dbd3d7f8664..bd69f28e1ea 100644
--- 
a/hbase-server/src/test/java/org/apache/hadoop/hbase/io/hfile/bucket/TestBucketCachePersister.java
+++ 
b/hbase-server/src/test/java/org/apache/hadoop/hbase/io/hfile/bucket/TestBucketCachePersister.java
@@ -38,7 +38,6 @@ import org.apache.hadoop.hbase.io.hfile.HFile;
 import org.apache.hadoop.hbase.io.hfile.HFileBlock;
 import org.apache.hadoop.hbase.io.hfile.HFileContext;
 import org.apache.hadoop.hbase.io.hfile.HFileContextBuilder;
-import org.apache.hadoop.hbase.io.hfile.PrefetchExecutor;
 import org.apache.hadoop.hbase.io.hfile.RandomKeyValueUtil;
 import org.apache.hadoop.hbase.regionserver.StoreFileWriter;
 import org.apache.hadoop.hbase.testclassification.IOTests;
@@ -85,7 +84,6 @@ public class TestBucketCachePersister {
   }
 
   public BucketCache setupBucketCache(Configuration conf) throws IOException {
-    conf.set(CacheConfig.PREFETCH_PERSISTENCE_PATH_KEY, (testDir + 
"/prefetch.persistence"));
     BucketCache bucketCache = new BucketCache("file:" + testDir + 
"/bucket.cache", capacitySize,
       constructedBlockSize, constructedBlockSizes, writeThreads, writerQLen,
       testDir + "/bucket.persistence", 60 * 1000, conf);
@@ -111,9 +109,7 @@ public class TestBucketCachePersister {
     readStoreFile(storeFile, 0, fs, cacheConf, conf, bucketCache);
     readStoreFile(storeFile2, 0, fs, cacheConf, conf, bucketCache);
     Thread.sleep(bucketCachePersistInterval);
-    assertTrue(new File(testDir + "/prefetch.persistence").exists());
     assertTrue(new File(testDir + "/bucket.persistence").exists());
-    assertTrue(new File(testDir + "/prefetch.persistence").delete());
     assertTrue(new File(testDir + "/bucket.persistence").delete());
     cleanupBucketCache(bucketCache);
   }
@@ -128,7 +124,6 @@ public class TestBucketCachePersister {
     // Load Cache
     Path storeFile = writeStoreFile("TestPrefetch2", conf, cacheConf, fs);
     readStoreFile(storeFile, 0, fs, cacheConf, conf, bucketCache);
-    assertFalse(new File(testDir + "/prefetch.persistence").exists());
     assertFalse(new File(testDir + "/bucket.persistence").exists());
     cleanupBucketCache(bucketCache);
   }
@@ -144,10 +139,10 @@ public class TestBucketCachePersister {
     readStoreFile(storeFile, 0, fs, cacheConf, conf, bucketCache1);
     Thread.sleep(500);
     // Evict Blocks from cache
+    assertTrue(bucketCache1.fullyCachedFiles.containsKey(storeFile.getName()));
     BlockCacheKey bucketCacheKey = 
bucketCache1.backingMap.entrySet().iterator().next().getKey();
-    assertTrue(PrefetchExecutor.isFilePrefetched(storeFile.getName()));
     bucketCache1.evictBlock(bucketCacheKey);
-    assertFalse(PrefetchExecutor.isFilePrefetched(storeFile.getName()));
+    
assertFalse(bucketCache1.fullyCachedFiles.containsKey(storeFile.getName()));
   }
 
   public void readStoreFile(Path storeFilePath, long offset, FileSystem fs, 
CacheConfig cacheConf,
diff --git 
a/hbase-server/src/test/java/org/apache/hadoop/hbase/io/hfile/bucket/TestByteBufferIOEngine.java
 
b/hbase-server/src/test/java/org/apache/hadoop/hbase/io/hfile/bucket/TestByteBufferIOEngine.java
index 820e91aa6e8..b42e7be804d 100644
--- 
a/hbase-server/src/test/java/org/apache/hadoop/hbase/io/hfile/bucket/TestByteBufferIOEngine.java
+++ 
b/hbase-server/src/test/java/org/apache/hadoop/hbase/io/hfile/bucket/TestByteBufferIOEngine.java
@@ -49,7 +49,7 @@ public class TestByteBufferIOEngine {
     private long off;
 
     MockBucketEntry(long offset, int length, ByteBuffAllocator allocator) {
-      super(offset & 0xFF00, length, 0, false, (entry) -> {
+      super(offset & 0xFF00, length, length, 0, false, (entry) -> {
         return ByteBuffAllocator.NONE;
       }, allocator);
       this.off = offset;
diff --git 
a/hbase-server/src/test/java/org/apache/hadoop/hbase/io/hfile/bucket/TestPrefetchPersistence.java
 
b/hbase-server/src/test/java/org/apache/hadoop/hbase/io/hfile/bucket/TestPrefetchPersistence.java
index 771ab0158f6..f15874bc61c 100644
--- 
a/hbase-server/src/test/java/org/apache/hadoop/hbase/io/hfile/bucket/TestPrefetchPersistence.java
+++ 
b/hbase-server/src/test/java/org/apache/hadoop/hbase/io/hfile/bucket/TestPrefetchPersistence.java
@@ -41,7 +41,6 @@ import org.apache.hadoop.hbase.io.hfile.HFile;
 import org.apache.hadoop.hbase.io.hfile.HFileBlock;
 import org.apache.hadoop.hbase.io.hfile.HFileContext;
 import org.apache.hadoop.hbase.io.hfile.HFileContextBuilder;
-import org.apache.hadoop.hbase.io.hfile.PrefetchExecutor;
 import org.apache.hadoop.hbase.io.hfile.RandomKeyValueUtil;
 import org.apache.hadoop.hbase.regionserver.StoreFileWriter;
 import org.apache.hadoop.hbase.testclassification.IOTests;
@@ -106,8 +105,6 @@ public class TestPrefetchPersistence {
     conf.setBoolean(CacheConfig.PREFETCH_BLOCKS_ON_OPEN_KEY, true);
     testDir = TEST_UTIL.getDataTestDir();
     TEST_UTIL.getTestFileSystem().mkdirs(testDir);
-    prefetchPersistencePath = testDir + "/prefetch.persistence";
-    conf.set(CacheConfig.PREFETCH_PERSISTENCE_PATH_KEY, 
prefetchPersistencePath);
     fs = HFileSystem.get(conf);
   }
 
@@ -132,10 +129,10 @@ public class TestPrefetchPersistence {
 
     bucketCache.shutdown();
     assertTrue(new File(testDir + "/bucket.persistence").exists());
-    assertTrue(new File(testDir + "/prefetch.persistence").exists());
     bucketCache = new BucketCache("file:" + testDir + "/bucket.cache", 
capacitySize,
       constructedBlockSize, constructedBlockSizes, writeThreads, writerQLen,
       testDir + "/bucket.persistence", 60 * 1000, conf);
+    cacheConf = new CacheConfig(conf, bucketCache);
     assertFalse(new File(testDir + "/bucket.persistence").exists());
     assertFalse(new File(testDir + "/prefetch.persistence").exists());
     assertTrue(usedSize != 0);
@@ -148,9 +145,9 @@ public class TestPrefetchPersistence {
 
   public void closeStoreFile(Path path) throws Exception {
     HFile.Reader reader = HFile.createReader(fs, path, cacheConf, true, conf);
-    assertTrue(PrefetchExecutor.isFilePrefetched(path.getName()));
+    assertTrue(bucketCache.fullyCachedFiles.containsKey(path.getName()));
     reader.close(true);
-    assertFalse(PrefetchExecutor.isFilePrefetched(path.getName()));
+    assertFalse(bucketCache.fullyCachedFiles.containsKey(path.getName()));
   }
 
   public void readStoreFile(Path storeFilePath, long offset) throws Exception {
diff --git 
a/hbase-server/src/test/java/org/apache/hadoop/hbase/io/hfile/bucket/TestRAMCache.java
 
b/hbase-server/src/test/java/org/apache/hadoop/hbase/io/hfile/bucket/TestRAMCache.java
index 0e777a4a7b9..58d9385f57e 100644
--- 
a/hbase-server/src/test/java/org/apache/hadoop/hbase/io/hfile/bucket/TestRAMCache.java
+++ 
b/hbase-server/src/test/java/org/apache/hadoop/hbase/io/hfile/bucket/TestRAMCache.java
@@ -90,7 +90,7 @@ public class TestRAMCache {
     MockHFileBlock blk = new MockHFileBlock(BlockType.DATA, size, size, -1,
       ByteBuffer.wrap(byteArr, 0, size), HFileBlock.FILL_HEADER, -1, 52, -1,
       new HFileContextBuilder().build(), ByteBuffAllocator.HEAP);
-    RAMQueueEntry re = new RAMQueueEntry(key, blk, 1, false);
+    RAMQueueEntry re = new RAMQueueEntry(key, blk, 1, false, false);
 
     Assert.assertNull(cache.putIfAbsent(key, re));
     Assert.assertEquals(cache.putIfAbsent(key, re), re);
diff --git 
a/hbase-server/src/test/java/org/apache/hadoop/hbase/io/hfile/bucket/TestVerifyBucketCacheFile.java
 
b/hbase-server/src/test/java/org/apache/hadoop/hbase/io/hfile/bucket/TestVerifyBucketCacheFile.java
index 3b2b9961b2b..6fdea844aa3 100644
--- 
a/hbase-server/src/test/java/org/apache/hadoop/hbase/io/hfile/bucket/TestVerifyBucketCacheFile.java
+++ 
b/hbase-server/src/test/java/org/apache/hadoop/hbase/io/hfile/bucket/TestVerifyBucketCacheFile.java
@@ -17,11 +17,15 @@
  */
 package org.apache.hadoop.hbase.io.hfile.bucket;
 
+import static 
org.apache.hadoop.hbase.io.hfile.CacheConfig.BUCKETCACHE_PERSIST_INTERVAL_KEY;
+import static 
org.apache.hadoop.hbase.io.hfile.bucket.BucketCache.DEFAULT_ERROR_TOLERATION_DURATION;
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertNotEquals;
+import static org.junit.Assert.assertNull;
 import static org.junit.Assert.assertTrue;
 
 import java.io.BufferedWriter;
+import java.io.File;
 import java.io.FileOutputStream;
 import java.io.OutputStreamWriter;
 import java.nio.file.FileSystems;
@@ -32,12 +36,15 @@ import java.util.Arrays;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.hbase.HBaseClassTestRule;
+import org.apache.hadoop.hbase.HBaseConfiguration;
 import org.apache.hadoop.hbase.HBaseTestingUtil;
 import org.apache.hadoop.hbase.io.hfile.BlockCacheKey;
 import org.apache.hadoop.hbase.io.hfile.CacheConfig;
 import org.apache.hadoop.hbase.io.hfile.CacheTestUtils;
 import org.apache.hadoop.hbase.io.hfile.Cacheable;
 import org.apache.hadoop.hbase.testclassification.SmallTests;
+import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
+import org.apache.hadoop.hbase.util.Pair;
 import org.junit.ClassRule;
 import org.junit.Test;
 import org.junit.experimental.categories.Category;
@@ -147,16 +154,15 @@ public class TestVerifyBucketCacheFile {
 
   @Test
   public void testRetrieveFromFileAfterDelete() throws Exception {
-
     HBaseTestingUtil TEST_UTIL = new HBaseTestingUtil();
     Path testDir = TEST_UTIL.getDataTestDir();
     TEST_UTIL.getTestFileSystem().mkdirs(testDir);
     Configuration conf = TEST_UTIL.getConfiguration();
     conf.setLong(CacheConfig.BUCKETCACHE_PERSIST_INTERVAL_KEY, 300);
-
-    BucketCache bucketCache = new BucketCache("file:" + testDir + 
"/bucket.cache", capacitySize,
-      constructedBlockSize, constructedBlockSizes, writeThreads, writerQLen,
-      testDir + "/bucket.persistence", 60 * 1000, conf);
+    String mapFileName = testDir + "/bucket.persistence" + 
EnvironmentEdgeManager.currentTime();
+    BucketCache bucketCache =
+      new BucketCache("file:" + testDir + "/bucket.cache", capacitySize, 
constructedBlockSize,
+        constructedBlockSizes, writeThreads, writerQLen, mapFileName, 60 * 
1000, conf);
 
     long usedSize = bucketCache.getAllocator().getUsedSize();
     assertEquals(0, usedSize);
@@ -171,14 +177,13 @@ public class TestVerifyBucketCacheFile {
     // Shutdown BucketCache
     bucketCache.shutdown();
     // Delete the persistence file
-    final java.nio.file.Path mapFile =
-      FileSystems.getDefault().getPath(testDir.toString(), 
"bucket.persistence");
-    assertTrue(Files.deleteIfExists(mapFile));
+    File mapFile = new File(mapFileName);
+    assertTrue(mapFile.delete());
     Thread.sleep(350);
     // Create BucketCache
-    bucketCache = new BucketCache("file:" + testDir + "/bucket.cache", 
capacitySize,
-      constructedBlockSize, constructedBlockSizes, writeThreads, writerQLen,
-      testDir + "/bucket.persistence", 60 * 1000, conf);
+    bucketCache =
+      new BucketCache("file:" + testDir + "/bucket.cache", capacitySize, 
constructedBlockSize,
+        constructedBlockSizes, writeThreads, writerQLen, mapFileName, 60 * 
1000, conf);
     assertEquals(0, bucketCache.getAllocator().getUsedSize());
     assertEquals(0, bucketCache.backingMap.size());
   }
@@ -232,9 +237,15 @@ public class TestVerifyBucketCacheFile {
   /**
    * Test whether BucketCache is started normally after modifying the cache 
file's last modified
    * time. First Start BucketCache and add some blocks, then shutdown 
BucketCache and persist cache
-   * to file. Then Restart BucketCache after modify cache file's last modified 
time, and it can't
-   * restore cache from file, the cache file and persistence file would be 
deleted before
-   * BucketCache start normally.
+   * to file. Then Restart BucketCache after modify cache file's last modified 
time. HBASE-XXXX has
+   * modified persistence cache such that now we store extra 8 bytes at the 
end of each block in the
+   * cache, representing the nanosecond time the block has been cached. So in 
the event the cache
+   * file has failed checksum verification during loading time, we go through 
all the cached blocks
+   * in the cache map and validate the cached time long between what is in the 
map and the cache
+   * file. If that check fails, we pull the cache key entry out of the map. 
Since in this test we
+   * are only modifying the access time to induce a checksum error, the cache 
file content is still
+   * valid and the extra verification should validate that all cache keys in 
the map are still
+   * recoverable from the cache.
    * @throws Exception the exception
    */
   @Test
@@ -249,6 +260,8 @@ public class TestVerifyBucketCacheFile {
     long usedSize = bucketCache.getAllocator().getUsedSize();
     assertEquals(0, usedSize);
 
+    Pair<String, Long> myPair = new Pair<>();
+
     CacheTestUtils.HFileBlockPair[] blocks =
       CacheTestUtils.generateHFileBlocks(constructedBlockSize, 1);
     // Add blocks
@@ -257,6 +270,8 @@ public class TestVerifyBucketCacheFile {
     }
     usedSize = bucketCache.getAllocator().getUsedSize();
     assertNotEquals(0, usedSize);
+    long blockCount = bucketCache.backingMap.size();
+    assertNotEquals(0, blockCount);
     // persist cache to file
     bucketCache.shutdown();
 
@@ -268,9 +283,64 @@ public class TestVerifyBucketCacheFile {
     bucketCache =
       new BucketCache("file:" + testDir + "/bucket.cache", capacitySize, 
constructedBlockSize,
         constructedBlockSizes, writeThreads, writerQLen, testDir + 
"/bucket.persistence");
-    assertEquals(0, bucketCache.getAllocator().getUsedSize());
-    assertEquals(0, bucketCache.backingMap.size());
+    assertEquals(usedSize, bucketCache.getAllocator().getUsedSize());
+    assertEquals(blockCount, bucketCache.backingMap.size());
+
+    TEST_UTIL.cleanupTestDir();
+  }
+
+  /**
+   * When using persistent bucket cache, there may be crashes between 
persisting the backing map and
+   * syncing new blocks to the cache file itself, leading to an inconsistent 
state between the cache
+   * keys and the cached data. This is to make sure the cache keys are updated 
accordingly, and the
+   * keys that are still valid do succeed in retrieve related block data from 
the cache without any
+   * corruption.
+   * @throws Exception the exception
+   */
+  @Test
+  public void testBucketCacheRecovery() throws Exception {
+    HBaseTestingUtil TEST_UTIL = new HBaseTestingUtil();
+    Path testDir = TEST_UTIL.getDataTestDir();
+    TEST_UTIL.getTestFileSystem().mkdirs(testDir);
+    Configuration conf = HBaseConfiguration.create();
+    // Disables the persister thread by setting its interval to MAX_VALUE
+    conf.setLong(BUCKETCACHE_PERSIST_INTERVAL_KEY, Long.MAX_VALUE);
+    String mapFileName = testDir + "/bucket.persistence" + 
EnvironmentEdgeManager.currentTime();
+    BucketCache bucketCache = new BucketCache("file:" + testDir + 
"/bucket.cache", capacitySize,
+      constructedBlockSize, constructedBlockSizes, writeThreads, writerQLen, 
mapFileName,
+      DEFAULT_ERROR_TOLERATION_DURATION, conf);
+
+    CacheTestUtils.HFileBlockPair[] blocks =
+      CacheTestUtils.generateHFileBlocks(constructedBlockSize, 4);
+    // Add three blocks
+    cacheAndWaitUntilFlushedToBucket(bucketCache, blocks[0].getBlockName(), 
blocks[0].getBlock());
+    cacheAndWaitUntilFlushedToBucket(bucketCache, blocks[1].getBlockName(), 
blocks[1].getBlock());
+    cacheAndWaitUntilFlushedToBucket(bucketCache, blocks[2].getBlockName(), 
blocks[2].getBlock());
+    // saves the current state
+    bucketCache.persistToFile();
+    // evicts first block
+    bucketCache.evictBlock(blocks[0].getBlockName());
+
+    // now adds a fourth block to bucket cache
+    cacheAndWaitUntilFlushedToBucket(bucketCache, blocks[3].getBlockName(), 
blocks[3].getBlock());
+    // Creates new bucket cache instance without persisting to file after 
evicting first block
+    // and caching fourth block. So the bucket cache file has only the last 
three blocks,
+    // but backing map (containing cache keys) was persisted when first three 
blocks
+    // were in the cache. So the state on this recovery is:
+    // - Backing map: [block0, block1, block2]
+    // - Cache: [block1, block2, block3]
+    // Therefore, this bucket cache would be able to recover only block1 and 
block2.
+    BucketCache newBucketCache = new BucketCache("file:" + testDir + 
"/bucket.cache", capacitySize,
+      constructedBlockSize, constructedBlockSizes, writeThreads, writerQLen, 
mapFileName,
+      DEFAULT_ERROR_TOLERATION_DURATION, conf);
 
+    assertNull(newBucketCache.getBlock(blocks[0].getBlockName(), false, false, 
false));
+    assertEquals(blocks[1].getBlock(),
+      newBucketCache.getBlock(blocks[1].getBlockName(), false, false, false));
+    assertEquals(blocks[2].getBlock(),
+      newBucketCache.getBlock(blocks[2].getBlockName(), false, false, false));
+    assertNull(newBucketCache.getBlock(blocks[3].getBlockName(), false, false, 
false));
+    assertEquals(2, newBucketCache.backingMap.size());
     TEST_UTIL.cleanupTestDir();
   }
 

Reply via email to