This is an automated email from the ASF dual-hosted git repository. wchevreuil pushed a commit to branch branch-2 in repository https://gitbox.apache.org/repos/asf/hbase.git
The following commit(s) were added to refs/heads/branch-2 by this push: new 7dfc2f3d05a HBASE-28004 Persistent cache map can get corrupt if crash happens midway through the write (#5341) 7dfc2f3d05a is described below commit 7dfc2f3d05a86b33fe1da66a1c941d6ce714b989 Author: Wellington Ramos Chevreuil <wchevre...@apache.org> AuthorDate: Wed Aug 23 10:17:21 2023 +0100 HBASE-28004 Persistent cache map can get corrupt if crash happens midway through the write (#5341) Signed-off-by: Ankit Singhal <an...@apache.org> Reviewed-by: Rahul Agarkar <rahul.agar...@gmail.com> Change-Id: I577990e1460d6fdc137e1dfcd26e85fed373ed6e --- .../src/main/protobuf/BucketCacheEntry.proto | 3 + .../apache/hadoop/hbase/io/hfile/CacheConfig.java | 2 - .../hadoop/hbase/io/hfile/HFilePreadReader.java | 41 +++- .../hadoop/hbase/io/hfile/PrefetchExecutor.java | 89 +-------- .../hadoop/hbase/io/hfile/bucket/BucketCache.java | 167 ++++++++++++---- .../hadoop/hbase/io/hfile/bucket/BucketEntry.java | 28 ++- .../hbase/io/hfile/bucket/BucketProtoUtils.java | 10 +- .../hadoop/hbase/io/hfile/bucket/FileIOEngine.java | 37 +++- .../hfile/TestBlockEvictionOnRegionMovement.java | 1 - .../hadoop/hbase/io/hfile/TestPrefetchRSClose.java | 4 - .../io/hfile/TestPrefetchWithBucketCache.java | 211 ++++++++++++++++++++ .../hbase/io/hfile/bucket/TestBucketCache.java | 219 ++++++++++++++++----- .../io/hfile/bucket/TestBucketCachePersister.java | 9 +- .../io/hfile/bucket/TestByteBufferIOEngine.java | 2 +- .../io/hfile/bucket/TestPrefetchPersistence.java | 9 +- .../hadoop/hbase/io/hfile/bucket/TestRAMCache.java | 2 +- .../io/hfile/bucket/TestVerifyBucketCacheFile.java | 101 ++++++++-- 17 files changed, 717 insertions(+), 218 deletions(-) diff --git a/hbase-protocol-shaded/src/main/protobuf/BucketCacheEntry.proto b/hbase-protocol-shaded/src/main/protobuf/BucketCacheEntry.proto index 038c6ca3f04..ae1980fe51e 100644 --- a/hbase-protocol-shaded/src/main/protobuf/BucketCacheEntry.proto +++ b/hbase-protocol-shaded/src/main/protobuf/BucketCacheEntry.proto @@ -32,6 +32,7 @@ message BucketCacheEntry { map<int32, string> deserializers = 4; required BackingMap backing_map = 5; optional bytes checksum = 6; + map<string, bool> prefetched_files = 7; } message BackingMap { @@ -71,6 +72,8 @@ message BucketEntry { required int64 access_counter = 3; required int32 deserialiser_index = 4; required BlockPriority priority = 5; + required int64 cachedTime = 6; + optional int32 disk_size_with_header = 7; } enum BlockPriority { diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/CacheConfig.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/CacheConfig.java index 1a9ff7cef33..d786e5bfa23 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/CacheConfig.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/CacheConfig.java @@ -93,8 +93,6 @@ public class CacheConfig { public static final String DROP_BEHIND_CACHE_COMPACTION_KEY = "hbase.hfile.drop.behind.compaction"; - public static final String PREFETCH_PERSISTENCE_PATH_KEY = "hbase.prefetch.file.list.path"; - /** * Configuration key to set interval for persisting bucket cache to disk. */ diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/HFilePreadReader.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/HFilePreadReader.java index 2c71ce9f484..f9c0ae59242 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/HFilePreadReader.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/HFilePreadReader.java @@ -18,9 +18,13 @@ package org.apache.hadoop.hbase.io.hfile; import java.io.IOException; +import java.util.Optional; +import org.apache.commons.lang3.mutable.MutableBoolean; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.Path; import org.apache.hadoop.hbase.io.FSDataInputStreamWrapper; +import org.apache.hadoop.hbase.io.hfile.bucket.BucketCache; +import org.apache.hadoop.hbase.io.hfile.bucket.BucketEntry; import org.apache.yetus.audience.InterfaceAudience; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -35,8 +39,14 @@ public class HFilePreadReader extends HFileReaderImpl { public HFilePreadReader(ReaderContext context, HFileInfo fileInfo, CacheConfig cacheConf, Configuration conf) throws IOException { super(context, fileInfo, cacheConf, conf); + final MutableBoolean fileAlreadyCached = new MutableBoolean(false); + BucketCache.getBuckedCacheFromCacheConfig(cacheConf).ifPresent(bc -> fileAlreadyCached + .setValue(bc.getFullyCachedFiles().get(path.getName()) == null ? false : true)); // Prefetch file blocks upon open if requested - if (cacheConf.shouldPrefetchOnOpen() && cacheIfCompactionsOff()) { + if ( + cacheConf.shouldPrefetchOnOpen() && cacheIfCompactionsOff() + && !fileAlreadyCached.booleanValue() + ) { PrefetchExecutor.request(path, new Runnable() { @Override public void run() { @@ -55,12 +65,36 @@ public class HFilePreadReader extends HFileReaderImpl { if (LOG.isTraceEnabled()) { LOG.trace("Prefetch start " + getPathOffsetEndStr(path, offset, end)); } + Optional<BucketCache> bucketCacheOptional = + BucketCache.getBuckedCacheFromCacheConfig(cacheConf); // Don't use BlockIterator here, because it's designed to read load-on-open section. long onDiskSizeOfNextBlock = -1; while (offset < end) { if (Thread.interrupted()) { break; } + // BucketCache can be persistent and resilient to restarts, so we check first if the + // block exists on its in-memory index, if so, we just update the offset and move on + // to the next block without actually going read all the way to the cache. + if (bucketCacheOptional.isPresent()) { + BucketCache cache = bucketCacheOptional.get(); + BlockCacheKey cacheKey = new BlockCacheKey(name, offset); + BucketEntry entry = cache.getBackingMap().get(cacheKey); + if (entry != null) { + cacheKey = new BlockCacheKey(name, offset); + entry = cache.getBackingMap().get(cacheKey); + if (entry == null) { + LOG.debug("No cache key {}, we'll read and cache it", cacheKey); + } else { + offset += entry.getOnDiskSizeWithHeader(); + LOG.debug("Found cache key {}. Skipping prefetch, the block is already cached.", + cacheKey); + continue; + } + } else { + LOG.debug("No entry in the backing map for cache key {}", cacheKey); + } + } // Perhaps we got our block from cache? Unlikely as this may be, if it happens, then // the internal-to-hfileblock thread local which holds the overread that gets the // next header, will not have happened...so, pass in the onDiskSize gotten from the @@ -77,12 +111,15 @@ public class HFilePreadReader extends HFileReaderImpl { block.release(); } } + BucketCache.getBuckedCacheFromCacheConfig(cacheConf) + .ifPresent(bc -> bc.fileCacheCompleted(path.getName())); + } catch (IOException e) { // IOExceptions are probably due to region closes (relocation, etc.) if (LOG.isTraceEnabled()) { LOG.trace("Prefetch " + getPathOffsetEndStr(path, offset, end), e); } - } catch (Exception e) { + } catch (Throwable e) { // Other exceptions are interesting LOG.warn("Prefetch " + getPathOffsetEndStr(path, offset, end), e); } finally { diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/PrefetchExecutor.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/PrefetchExecutor.java index d3064e066a1..02fbc12e85c 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/PrefetchExecutor.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/PrefetchExecutor.java @@ -17,11 +17,6 @@ */ package org.apache.hadoop.hbase.io.hfile; -import java.io.File; -import java.io.FileInputStream; -import java.io.FileOutputStream; -import java.io.IOException; -import java.util.HashMap; import java.util.Map; import java.util.concurrent.ConcurrentSkipListMap; import java.util.concurrent.Future; @@ -42,8 +37,6 @@ import org.apache.yetus.audience.InterfaceAudience; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import org.apache.hadoop.hbase.shaded.protobuf.generated.PersistentPrefetchProtos; - @InterfaceAudience.Private public final class PrefetchExecutor { @@ -51,16 +44,12 @@ public final class PrefetchExecutor { /** Futures for tracking block prefetch activity */ private static final Map<Path, Future<?>> prefetchFutures = new ConcurrentSkipListMap<>(); - /** Set of files for which prefetch is completed */ - @edu.umd.cs.findbugs.annotations.SuppressWarnings(value = "MS_SHOULD_BE_FINAL") - private static HashMap<String, Boolean> prefetchCompleted = new HashMap<>(); /** Executor pool shared among all HFiles for block prefetch */ private static final ScheduledExecutorService prefetchExecutorPool; /** Delay before beginning prefetch */ private static final int prefetchDelayMillis; /** Variation in prefetch delay times, to mitigate stampedes */ private static final float prefetchDelayVariation; - static String prefetchedFileListPath; static { // Consider doing this on demand with a configuration passed in rather // than in a static initializer. @@ -90,13 +79,6 @@ public final class PrefetchExecutor { + HConstants.HREGION_COMPACTIONDIR_NAME.replace(".", "\\.") + Path.SEPARATOR_CHAR + ")"); public static void request(Path path, Runnable runnable) { - if (prefetchCompleted != null) { - if (isFilePrefetched(path.getName())) { - LOG.info( - "File has already been prefetched before the restart, so skipping prefetch : " + path); - return; - } - } if (!prefetchPathExclude.matcher(path.toString()).find()) { long delay; if (prefetchDelayMillis > 0) { @@ -122,8 +104,9 @@ public final class PrefetchExecutor { public static void complete(Path path) { prefetchFutures.remove(path); - prefetchCompleted.put(path.getName(), true); - LOG.debug("Prefetch completed for {}", path.getName()); + if (LOG.isDebugEnabled()) { + LOG.debug("Prefetch completed for {}", path.getName()); + } } public static void cancel(Path path) { @@ -134,8 +117,6 @@ public final class PrefetchExecutor { prefetchFutures.remove(path); LOG.debug("Prefetch cancelled for {}", path); } - LOG.debug("Removing filename from the prefetched persistence list: {}", path.getName()); - removePrefetchedFileWhileEvict(path.getName()); } public static boolean isCompleted(Path path) { @@ -146,70 +127,6 @@ public final class PrefetchExecutor { return true; } - @edu.umd.cs.findbugs.annotations.SuppressWarnings(value = "OBL_UNSATISFIED_OBLIGATION", - justification = "false positive, try-with-resources ensures close is called.") - public static void persistToFile(String path) throws IOException { - prefetchedFileListPath = path; - if (prefetchedFileListPath == null) { - LOG.info("Exception while persisting prefetch!"); - throw new IOException("Error persisting prefetched HFiles set!"); - } - if (!prefetchCompleted.isEmpty()) { - try (FileOutputStream fos = new FileOutputStream(prefetchedFileListPath, false)) { - PrefetchProtoUtils.toPB(prefetchCompleted).writeDelimitedTo(fos); - } - } - } - - public static void retrieveFromFile(String path) throws IOException { - prefetchedFileListPath = path; - File prefetchPersistenceFile = new File(prefetchedFileListPath); - if (!prefetchPersistenceFile.exists()) { - LOG.warn("Prefetch persistence file does not exist!"); - return; - } - LOG.info("Retrieving from prefetch persistence file " + path); - assert (prefetchedFileListPath != null); - try (FileInputStream fis = deleteFileOnClose(prefetchPersistenceFile)) { - PersistentPrefetchProtos.PrefetchedHfileName proto = - PersistentPrefetchProtos.PrefetchedHfileName.parseDelimitedFrom(fis); - Map<String, Boolean> protoPrefetchedFilesMap = proto.getPrefetchedFilesMap(); - prefetchCompleted.putAll(protoPrefetchedFilesMap); - } - } - - private static FileInputStream deleteFileOnClose(final File file) throws IOException { - return new FileInputStream(file) { - private File myFile; - - private FileInputStream init(File file) { - myFile = file; - return this; - } - - @Override - public void close() throws IOException { - if (myFile == null) { - return; - } - - super.close(); - if (!myFile.delete()) { - throw new IOException("Failed deleting persistence file " + myFile.getAbsolutePath()); - } - myFile = null; - } - }.init(file); - } - - public static void removePrefetchedFileWhileEvict(String hfileName) { - prefetchCompleted.remove(hfileName); - } - - public static boolean isFilePrefetched(String hfileName) { - return prefetchCompleted.containsKey(hfileName); - } - private PrefetchExecutor() { } } diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/bucket/BucketCache.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/bucket/BucketCache.java index 1e8e6cd7c23..5768b5b03d5 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/bucket/BucketCache.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/bucket/BucketCache.java @@ -18,7 +18,6 @@ package org.apache.hadoop.hbase.io.hfile.bucket; import static org.apache.hadoop.hbase.io.hfile.CacheConfig.BUCKETCACHE_PERSIST_INTERVAL_KEY; -import static org.apache.hadoop.hbase.io.hfile.CacheConfig.PREFETCH_PERSISTENCE_PATH_KEY; import java.io.File; import java.io.FileInputStream; @@ -32,6 +31,7 @@ import java.util.Iterator; import java.util.List; import java.util.Map; import java.util.NavigableSet; +import java.util.Optional; import java.util.PriorityQueue; import java.util.Set; import java.util.concurrent.ArrayBlockingQueue; @@ -53,6 +53,7 @@ import java.util.function.Function; import org.apache.commons.io.IOUtils; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.hbase.HBaseConfiguration; +import org.apache.hadoop.hbase.HBaseIOException; import org.apache.hadoop.hbase.TableName; import org.apache.hadoop.hbase.client.Admin; import org.apache.hadoop.hbase.io.ByteBuffAllocator; @@ -63,12 +64,13 @@ import org.apache.hadoop.hbase.io.hfile.BlockCacheKey; import org.apache.hadoop.hbase.io.hfile.BlockCacheUtil; import org.apache.hadoop.hbase.io.hfile.BlockPriority; import org.apache.hadoop.hbase.io.hfile.BlockType; +import org.apache.hadoop.hbase.io.hfile.CacheConfig; import org.apache.hadoop.hbase.io.hfile.CacheStats; import org.apache.hadoop.hbase.io.hfile.Cacheable; import org.apache.hadoop.hbase.io.hfile.CachedBlock; +import org.apache.hadoop.hbase.io.hfile.CombinedBlockCache; import org.apache.hadoop.hbase.io.hfile.HFileBlock; import org.apache.hadoop.hbase.io.hfile.HFileContext; -import org.apache.hadoop.hbase.io.hfile.PrefetchExecutor; import org.apache.hadoop.hbase.nio.ByteBuff; import org.apache.hadoop.hbase.nio.RefCnt; import org.apache.hadoop.hbase.protobuf.ProtobufMagic; @@ -138,8 +140,14 @@ public class BucketCache implements BlockCache, HeapSize { // Store the block in this map before writing it to cache transient final RAMCache ramCache; + // In this map, store the block's meta data like offset, length - transient ConcurrentHashMap<BlockCacheKey, BucketEntry> backingMap; + transient Map<BlockCacheKey, BucketEntry> backingMap; + + /** Set of files for which prefetch is completed */ + final Map<String, Boolean> fullyCachedFiles = new ConcurrentHashMap<>(); + + private BucketCachePersister cachePersister; /** * Flag if the cache is enabled or not... We shut it off if there are IO errors for some time, so @@ -172,9 +180,6 @@ public class BucketCache implements BlockCache, HeapSize { private static final int DEFAULT_CACHE_WAIT_TIME = 50; private final BucketCacheStats cacheStats = new BucketCacheStats(); - - /** BucketCache persister thread */ - private BucketCachePersister cachePersister; private final String persistencePath; static AtomicBoolean isCacheInconsistent = new AtomicBoolean(false); private final long cacheCapacity; @@ -235,8 +240,6 @@ public class BucketCache implements BlockCache, HeapSize { /** In-memory bucket size */ private float memoryFactor; - private String prefetchedFileListPath; - private long bucketcachePersistInterval; private static final String FILE_VERIFY_ALGORITHM = @@ -283,7 +286,6 @@ public class BucketCache implements BlockCache, HeapSize { this.memoryFactor = conf.getFloat(MEMORY_FACTOR_CONFIG_NAME, DEFAULT_MEMORY_FACTOR); this.queueAdditionWaitTime = conf.getLong(QUEUE_ADDITION_WAIT_TIME, DEFAULT_QUEUE_ADDITION_WAIT_TIME); - this.prefetchedFileListPath = conf.get(PREFETCH_PERSISTENCE_PATH_KEY); this.bucketcachePersistInterval = conf.getLong(BUCKETCACHE_PERSIST_INTERVAL_KEY, 1000); sanityCheckConfigs(); @@ -309,11 +311,15 @@ public class BucketCache implements BlockCache, HeapSize { this.backingMap = new ConcurrentHashMap<>((int) blockNumCapacity); - if (ioEngine.isPersistent() && persistencePath != null) { - startBucketCachePersisterThread(); + if (isCachePersistent()) { + if (ioEngine instanceof FileIOEngine) { + startBucketCachePersisterThread(); + } try { retrieveFromFile(bucketSizes); } catch (IOException ioex) { + backingMap.clear(); + fullyCachedFiles.clear(); LOG.error("Can't restore from file[" + persistencePath + "] because of ", ioex); } } @@ -418,7 +424,7 @@ public class BucketCache implements BlockCache, HeapSize { } public boolean isCachePersistenceEnabled() { - return (prefetchedFileListPath != null) && (persistencePath != null); + return persistencePath != null; } /** @@ -493,8 +499,8 @@ public class BucketCache implements BlockCache, HeapSize { } LOG.trace("Caching key={}, item={}", cacheKey, cachedItem); // Stuff the entry into the RAM cache so it can get drained to the persistent store - RAMQueueEntry re = - new RAMQueueEntry(cacheKey, cachedItem, accessCount.incrementAndGet(), inMemory); + RAMQueueEntry re = new RAMQueueEntry(cacheKey, cachedItem, accessCount.incrementAndGet(), + inMemory, isCachePersistent() && ioEngine instanceof FileIOEngine); /** * Don't use ramCache.put(cacheKey, re) here. because there may be a existing entry with same * key in ramCache, the heap size of bucket cache need to update if replacing entry from @@ -578,6 +584,12 @@ public class BucketCache implements BlockCache, HeapSize { } return cachedBlock; } + } catch (HBaseIOException hioex) { + // When using file io engine persistent cache, + // the cache map state might differ from the actual cache. If we reach this block, + // we should remove the cache key entry from the backing map + backingMap.remove(key); + LOG.debug("Failed to fetch block for cache key: {}.", key, hioex); } catch (IOException ioex) { LOG.error("Failed reading block " + key + " from bucket cache", ioex); checkIOErrorIsTolerated(); @@ -605,13 +617,15 @@ public class BucketCache implements BlockCache, HeapSize { cacheStats.evicted(bucketEntry.getCachedTime(), cacheKey.isPrimary()); } if (ioEngine.isPersistent()) { - if (prefetchedFileListPath != null) { - PrefetchExecutor.removePrefetchedFileWhileEvict(cacheKey.getHfileName()); - } + fullyCachedFiles.remove(cacheKey.getHfileName()); setCacheInconsistent(true); } } + public void fileCacheCompleted(String fileName) { + fullyCachedFiles.put(fileName, true); + } + /** * Free the {{@link BucketEntry} actually,which could only be invoked when the * {@link BucketEntry#refCnt} becoming 0. @@ -1241,18 +1255,24 @@ public class BucketCache implements BlockCache, HeapSize { @edu.umd.cs.findbugs.annotations.SuppressWarnings(value = "OBL_UNSATISFIED_OBLIGATION", justification = "false positive, try-with-resources ensures close is called.") void persistToFile() throws IOException { - if (!ioEngine.isPersistent()) { + if (!isCachePersistent()) { throw new IOException("Attempt to persist non-persistent cache mappings!"); } - try (FileOutputStream fos = new FileOutputStream(persistencePath, false)) { + File tempPersistencePath = new File(persistencePath + EnvironmentEdgeManager.currentTime()); + try (FileOutputStream fos = new FileOutputStream(tempPersistencePath, false)) { fos.write(ProtobufMagic.PB_MAGIC); BucketProtoUtils.toPB(this).writeDelimitedTo(fos); } - if (prefetchedFileListPath != null) { - PrefetchExecutor.persistToFile(prefetchedFileListPath); + if (!tempPersistencePath.renameTo(new File(persistencePath))) { + LOG.warn("Failed to commit cache persistent file. We might lose cached blocks if " + + "RS crashes/restarts before we successfully checkpoint again."); } } + private boolean isCachePersistent() { + return ioEngine.isPersistent() && persistencePath != null; + } + /** * @see #persistToFile() */ @@ -1262,9 +1282,6 @@ public class BucketCache implements BlockCache, HeapSize { return; } assert !cacheEnabled; - if (prefetchedFileListPath != null) { - PrefetchExecutor.retrieveFromFile(prefetchedFileListPath); - } try (FileInputStream in = deleteFileOnClose(persistenceFile)) { int pblen = ProtobufMagic.lengthOfPBMagic(); @@ -1343,16 +1360,37 @@ public class BucketCache implements BlockCache, HeapSize { } private void parsePB(BucketCacheProtos.BucketCacheEntry proto) throws IOException { + backingMap = BucketProtoUtils.fromPB(proto.getDeserializersMap(), proto.getBackingMap(), + this::createRecycler); + fullyCachedFiles.clear(); + fullyCachedFiles.putAll(proto.getPrefetchedFilesMap()); if (proto.hasChecksum()) { - ((PersistentIOEngine) ioEngine).verifyFileIntegrity(proto.getChecksum().toByteArray(), - algorithm); + try { + ((PersistentIOEngine) ioEngine).verifyFileIntegrity(proto.getChecksum().toByteArray(), + algorithm); + } catch (IOException e) { + LOG.warn("Checksum for cache file failed. " + + "We need to validate each cache key in the backing map. This may take some time..."); + long startTime = EnvironmentEdgeManager.currentTime(); + int totalKeysOriginally = backingMap.size(); + for (Map.Entry<BlockCacheKey, BucketEntry> keyEntry : backingMap.entrySet()) { + try { + ((FileIOEngine) ioEngine).checkCacheTime(keyEntry.getValue()); + } catch (IOException e1) { + LOG.debug("Check for key {} failed. Removing it from map.", keyEntry.getKey()); + backingMap.remove(keyEntry.getKey()); + fullyCachedFiles.remove(keyEntry.getKey().getHfileName()); + } + } + LOG.info("Finished validating {} keys in the backing map. Recovered: {}. This took {}ms.", + totalKeysOriginally, backingMap.size(), + (EnvironmentEdgeManager.currentTime() - startTime)); + } } else { // if has not checksum, it means the persistence file is old format LOG.info("Persistent file is old format, it does not support verifying file integrity!"); } verifyCapacityAndClasses(proto.getCacheCapacity(), proto.getIoClass(), proto.getMapClass()); - backingMap = BucketProtoUtils.fromPB(proto.getDeserializersMap(), proto.getBackingMap(), - this::createRecycler); } /** @@ -1388,6 +1426,7 @@ public class BucketCache implements BlockCache, HeapSize { if (!ioEngine.isPersistent() || persistencePath == null) { // If persistent ioengine and a path, we will serialize out the backingMap. this.backingMap.clear(); + this.fullyCachedFiles.clear(); } } @@ -1402,7 +1441,9 @@ public class BucketCache implements BlockCache, HeapSize { LOG.info("Shutdown bucket cache: IO persistent=" + ioEngine.isPersistent() + "; path to write=" + persistencePath); if (ioEngine.isPersistent() && persistencePath != null) { - cachePersister.interrupt(); + if (cachePersister != null) { + cachePersister.interrupt(); + } try { join(); persistToFile(); @@ -1414,6 +1455,17 @@ public class BucketCache implements BlockCache, HeapSize { } } + /** + * Needed mostly for UTs that might run in the same VM and create different BucketCache instances + * on different UT methods. + */ + @Override + protected void finalize() { + if (cachePersister != null && !cachePersister.isInterrupted()) { + cachePersister.interrupt(); + } + } + @Override public CacheStats getStats() { return cacheStats; @@ -1470,7 +1522,7 @@ public class BucketCache implements BlockCache, HeapSize { */ @Override public int evictBlocksByHfileName(String hfileName) { - PrefetchExecutor.removePrefetchedFileWhileEvict(hfileName); + this.fullyCachedFiles.remove(hfileName); Set<BlockCacheKey> keySet = blocksByHFile.subSet(new BlockCacheKey(hfileName, Long.MIN_VALUE), true, new BlockCacheKey(hfileName, Long.MAX_VALUE), true); @@ -1541,12 +1593,15 @@ public class BucketCache implements BlockCache, HeapSize { private final Cacheable data; private long accessCounter; private boolean inMemory; + private boolean isCachePersistent; - RAMQueueEntry(BlockCacheKey bck, Cacheable data, long accessCounter, boolean inMemory) { + RAMQueueEntry(BlockCacheKey bck, Cacheable data, long accessCounter, boolean inMemory, + boolean isCachePersistent) { this.key = bck; this.data = data; this.accessCounter = accessCounter; this.inMemory = inMemory; + this.isCachePersistent = isCachePersistent; } public Cacheable getData() { @@ -1576,12 +1631,19 @@ public class BucketCache implements BlockCache, HeapSize { if (len == 0) { return null; } + if (isCachePersistent && data instanceof HFileBlock) { + len += Long.BYTES; // we need to record the cache time for consistency check in case of + // recovery + } long offset = alloc.allocateBlock(len); boolean succ = false; BucketEntry bucketEntry = null; try { - bucketEntry = new BucketEntry(offset, len, accessCounter, inMemory, createRecycler, - getByteBuffAllocator()); + int diskSizeWithHeader = (data instanceof HFileBlock) + ? ((HFileBlock) data).getOnDiskSizeWithHeader() + : data.getSerializedLength(); + bucketEntry = new BucketEntry(offset, len, diskSizeWithHeader, accessCounter, inMemory, + createRecycler, getByteBuffAllocator()); bucketEntry.setDeserializerReference(data.getDeserializer()); if (data instanceof HFileBlock) { // If an instance of HFileBlock, save on some allocations. @@ -1589,7 +1651,16 @@ public class BucketCache implements BlockCache, HeapSize { ByteBuff sliceBuf = block.getBufferReadOnly(); block.getMetaData(metaBuff); ioEngine.write(sliceBuf, offset); - ioEngine.write(metaBuff, offset + len - metaBuff.limit()); + // adds the cache time after the block and metadata part + if (isCachePersistent) { + ioEngine.write(metaBuff, offset + len - metaBuff.limit() - Long.BYTES); + ByteBuffer buffer = ByteBuffer.allocate(Long.BYTES); + buffer.putLong(bucketEntry.getCachedTime()); + buffer.rewind(); + ioEngine.write(buffer, (offset + len - Long.BYTES)); + } else { + ioEngine.write(metaBuff, offset + len - metaBuff.limit()); + } } else { // Only used for testing. ByteBuffer bb = ByteBuffer.allocate(len); @@ -1745,6 +1816,10 @@ public class BucketCache implements BlockCache, HeapSize { return memoryFactor; } + public String getPersistencePath() { + return persistencePath; + } + /** * Wrapped the delegate ConcurrentMap with maintaining its block's reference count. */ @@ -1822,4 +1897,28 @@ public class BucketCache implements BlockCache, HeapSize { } } } + + public Map<BlockCacheKey, BucketEntry> getBackingMap() { + return backingMap; + } + + public Map<String, Boolean> getFullyCachedFiles() { + return fullyCachedFiles; + } + + public static Optional<BucketCache> getBuckedCacheFromCacheConfig(CacheConfig cacheConf) { + if (cacheConf.getBlockCache().isPresent()) { + BlockCache bc = cacheConf.getBlockCache().get(); + if (bc instanceof CombinedBlockCache) { + BlockCache l2 = ((CombinedBlockCache) bc).getSecondLevelCache(); + if (l2 instanceof BucketCache) { + return Optional.of((BucketCache) l2); + } + } else if (bc instanceof BucketCache) { + return Optional.of((BucketCache) bc); + } + } + return Optional.empty(); + } + } diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/bucket/BucketEntry.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/bucket/BucketEntry.java index a04a32bfe64..c93dac8a572 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/bucket/BucketEntry.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/bucket/BucketEntry.java @@ -43,13 +43,15 @@ import org.apache.yetus.audience.InterfaceAudience; * bytes gives us 256TB or so. */ @InterfaceAudience.Private -class BucketEntry implements HBaseReferenceCounted { +public class BucketEntry implements HBaseReferenceCounted { // access counter comparator, descending order static final Comparator<BucketEntry> COMPARATOR = Comparator.comparingLong(BucketEntry::getAccessCounter).reversed(); private int offsetBase; private int length; + + private int onDiskSizeWithHeader; private byte offset1; /** @@ -91,24 +93,32 @@ class BucketEntry implements HBaseReferenceCounted { /** * Time this block was cached. Presumes we are created just before we are added to the cache. */ - private final long cachedTime = System.nanoTime(); + private long cachedTime = System.nanoTime(); /** * @param createRecycler used to free this {@link BucketEntry} when {@link BucketEntry#refCnt} * becoming 0. NOTICE that {@link ByteBuffAllocator#NONE} could only be used * for test. */ - BucketEntry(long offset, int length, long accessCounter, boolean inMemory, - Function<BucketEntry, Recycler> createRecycler, ByteBuffAllocator allocator) { + BucketEntry(long offset, int length, int onDiskSizeWithHeader, long accessCounter, + boolean inMemory, Function<BucketEntry, Recycler> createRecycler, ByteBuffAllocator allocator) { + this(offset, length, onDiskSizeWithHeader, accessCounter, System.nanoTime(), inMemory, + createRecycler, allocator); + } + + BucketEntry(long offset, int length, int onDiskSizeWithHeader, long accessCounter, + long cachedTime, boolean inMemory, Function<BucketEntry, Recycler> createRecycler, + ByteBuffAllocator allocator) { if (createRecycler == null) { throw new IllegalArgumentException("createRecycler could not be null!"); } setOffset(offset); this.length = length; + this.onDiskSizeWithHeader = onDiskSizeWithHeader; this.accessCounter = accessCounter; + this.cachedTime = cachedTime; this.priority = inMemory ? BlockPriority.MEMORY : BlockPriority.MULTI; this.refCnt = RefCnt.create(createRecycler.apply(this)); - this.markedAsEvicted = new AtomicBoolean(false); this.allocator = allocator; } @@ -159,10 +169,14 @@ class BucketEntry implements HBaseReferenceCounted { return this.priority; } - long getCachedTime() { + public long getCachedTime() { return cachedTime; } + public int getOnDiskSizeWithHeader() { + return onDiskSizeWithHeader; + } + /** * The {@link BucketCache} will try to release its reference to this BucketEntry many times. we * must make sure the idempotent, otherwise it'll decrease the RPC's reference count in advance, @@ -239,7 +253,7 @@ class BucketEntry implements HBaseReferenceCounted { * also release its refCnt (case.1 will do this) and no other rpc reference, then it will free the * area in bucketAllocator. <br> * 3.evict those block without any rpc reference if cache size exceeded. we'll only free those - * blocks with zero rpc reference count, as the {@link BucketEntry#markStaleAsEvicted()} do. + * blocks with zero rpc reference count. * @return true to indicate we've decreased to zero and do the de-allocation. */ @Override diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/bucket/BucketProtoUtils.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/bucket/BucketProtoUtils.java index ff4e90b8865..8830e5d3255 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/bucket/BucketProtoUtils.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/bucket/BucketProtoUtils.java @@ -45,6 +45,7 @@ final class BucketProtoUtils { .setIoClass(cache.ioEngine.getClass().getName()) .setMapClass(cache.backingMap.getClass().getName()) .putAllDeserializers(CacheableDeserializerIdManager.save()) + .putAllPrefetchedFiles(cache.fullyCachedFiles) .setBackingMap(BucketProtoUtils.toPB(cache.backingMap)) .setChecksum(ByteString .copyFrom(((PersistentIOEngine) cache.ioEngine).calculateChecksum(cache.getAlgorithm()))) @@ -99,8 +100,10 @@ final class BucketProtoUtils { private static BucketCacheProtos.BucketEntry toPB(BucketEntry entry) { return BucketCacheProtos.BucketEntry.newBuilder().setOffset(entry.offset()) - .setLength(entry.getLength()).setDeserialiserIndex(entry.deserializerIndex) - .setAccessCounter(entry.getAccessCounter()).setPriority(toPB(entry.getPriority())).build(); + .setCachedTime(entry.getCachedTime()).setLength(entry.getLength()) + .setDiskSizeWithHeader(entry.getOnDiskSizeWithHeader()) + .setDeserialiserIndex(entry.deserializerIndex).setAccessCounter(entry.getAccessCounter()) + .setPriority(toPB(entry.getPriority())).build(); } private static BucketCacheProtos.BlockPriority toPB(BlockPriority p) { @@ -128,7 +131,8 @@ final class BucketProtoUtils { // TODO:We use ByteBuffAllocator.HEAP here, because we could not get the ByteBuffAllocator // which created by RpcServer elegantly. BucketEntry value = new BucketEntry(protoValue.getOffset(), protoValue.getLength(), - protoValue.getAccessCounter(), + protoValue.getDiskSizeWithHeader(), protoValue.getAccessCounter(), + protoValue.getCachedTime(), protoValue.getPriority() == BucketCacheProtos.BlockPriority.memory, createRecycler, ByteBuffAllocator.HEAP); // This is the deserializer that we stored diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/bucket/FileIOEngine.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/bucket/FileIOEngine.java index 370343b1b25..38f9db04b6d 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/bucket/FileIOEngine.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/bucket/FileIOEngine.java @@ -26,6 +26,7 @@ import java.nio.channels.ClosedChannelException; import java.nio.channels.FileChannel; import java.util.Arrays; import java.util.concurrent.locks.ReentrantLock; +import org.apache.hadoop.hbase.HBaseIOException; import org.apache.hadoop.hbase.exceptions.IllegalArgumentIOException; import org.apache.hadoop.hbase.io.hfile.Cacheable; import org.apache.hadoop.hbase.nio.ByteBuff; @@ -49,6 +50,7 @@ public class FileIOEngine extends PersistentIOEngine { private final long sizePerFile; private final long capacity; + private boolean maintainPersistence; private FileReadAccessor readAccessor = new FileReadAccessor(); private FileWriteAccessor writeAccessor = new FileWriteAccessor(); @@ -59,6 +61,7 @@ public class FileIOEngine extends PersistentIOEngine { this.sizePerFile = capacity / filePaths.length; this.capacity = this.sizePerFile * filePaths.length; this.fileChannels = new FileChannel[filePaths.length]; + this.maintainPersistence = maintainPersistence; if (!maintainPersistence) { for (String filePath : filePaths) { File file = new File(filePath); @@ -145,10 +148,42 @@ public class FileIOEngine extends PersistentIOEngine { throw ioe; } } - dstBuff.rewind(); + if (maintainPersistence) { + dstBuff.position(length - Long.BYTES); + long cachedNanoTime = dstBuff.getLong(); + if (be.getCachedTime() != cachedNanoTime) { + dstBuff.release(); + throw new HBaseIOException("The cached time recorded within the cached block differs " + + "from its bucket entry, so it might not be the same."); + } + dstBuff.rewind(); + dstBuff.limit(length - Long.BYTES); + dstBuff = dstBuff.slice(); + } else { + dstBuff.rewind(); + } return be.wrapAsCacheable(dstBuff); } + void checkCacheTime(BucketEntry be) throws IOException { + long offset = be.offset(); + int length = be.getLength(); + ByteBuff dstBuff = be.allocator.allocate(Long.BYTES); + try { + accessFile(readAccessor, dstBuff, (offset + length - Long.BYTES)); + } catch (IOException ioe) { + dstBuff.release(); + throw ioe; + } + dstBuff.rewind(); + long cachedNanoTime = dstBuff.getLong(); + if (be.getCachedTime() != cachedNanoTime) { + dstBuff.release(); + throw new HBaseIOException("The cached time recorded within the cached block differs " + + "from its bucket entry, so it might not be the same."); + } + } + void closeFileChannels() { for (FileChannel fileChannel : fileChannels) { try { diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/io/hfile/TestBlockEvictionOnRegionMovement.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/io/hfile/TestBlockEvictionOnRegionMovement.java index 2c8bb66bfb9..8cd80e755cd 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/io/hfile/TestBlockEvictionOnRegionMovement.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/io/hfile/TestBlockEvictionOnRegionMovement.java @@ -78,7 +78,6 @@ public class TestBlockEvictionOnRegionMovement { conf.set(BUCKET_CACHE_IOENGINE_KEY, "file:" + testDir + "/bucket.cache"); conf.setInt("hbase.bucketcache.size", 400); conf.set("hbase.bucketcache.persistent.path", testDir + "/bucket.persistence"); - conf.set(CacheConfig.PREFETCH_PERSISTENCE_PATH_KEY, testDir + "/prefetch.persistence"); conf.setLong(CacheConfig.BUCKETCACHE_PERSIST_INTERVAL_KEY, 100); conf.setBoolean(CacheConfig.EVICT_BLOCKS_ON_CLOSE_KEY, true); zkCluster = TEST_UTIL.startMiniZKCluster(); diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/io/hfile/TestPrefetchRSClose.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/io/hfile/TestPrefetchRSClose.java index ee324e9dbaa..3794c6c28d7 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/io/hfile/TestPrefetchRSClose.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/io/hfile/TestPrefetchRSClose.java @@ -74,7 +74,6 @@ public class TestPrefetchRSClose { conf.set(BUCKET_CACHE_IOENGINE_KEY, "file:" + testDir + "/bucket.cache"); conf.setInt("hbase.bucketcache.size", 400); conf.set("hbase.bucketcache.persistent.path", testDir + "/bucket.persistence"); - conf.set(CacheConfig.PREFETCH_PERSISTENCE_PATH_KEY, testDir + "/prefetch.persistence"); zkCluster = TEST_UTIL.startMiniZKCluster(); cluster = TEST_UTIL.startMiniHBaseCluster(option); assertEquals(2, cluster.getRegionServerThreads().size()); @@ -113,18 +112,15 @@ public class TestPrefetchRSClose { // Default interval for cache persistence is 1000ms. So after 1000ms, both the persistence files // should exist. assertTrue(new File(testDir + "/bucket.persistence").exists()); - assertTrue(new File(testDir + "/prefetch.persistence").exists()); // Stop the RS cluster.stopRegionServer(0); LOG.info("Stopped Region Server 0."); Thread.sleep(1000); assertTrue(new File(testDir + "/bucket.persistence").exists()); - assertTrue(new File(testDir + "/prefetch.persistence").exists()); // Start the RS and validate cluster.startRegionServer(); - assertFalse(new File(testDir + "/prefetch.persistence").exists()); assertFalse(new File(testDir + "/bucket.persistence").exists()); } diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/io/hfile/TestPrefetchWithBucketCache.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/io/hfile/TestPrefetchWithBucketCache.java new file mode 100644 index 00000000000..972d4942819 --- /dev/null +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/io/hfile/TestPrefetchWithBucketCache.java @@ -0,0 +1,211 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase.io.hfile; + +import static org.apache.hadoop.hbase.HConstants.BUCKET_CACHE_IOENGINE_KEY; +import static org.apache.hadoop.hbase.HConstants.BUCKET_CACHE_SIZE_KEY; +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertNotNull; +import static org.junit.Assert.assertTrue; +import static org.junit.Assert.fail; + +import java.io.File; +import java.io.IOException; +import java.util.Map; +import java.util.Random; +import java.util.concurrent.ThreadLocalRandom; +import java.util.function.BiConsumer; +import java.util.function.BiFunction; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.hbase.HBaseClassTestRule; +import org.apache.hadoop.hbase.HBaseTestingUtility; +import org.apache.hadoop.hbase.KeyValue; +import org.apache.hadoop.hbase.Waiter; +import org.apache.hadoop.hbase.fs.HFileSystem; +import org.apache.hadoop.hbase.io.hfile.bucket.BucketCache; +import org.apache.hadoop.hbase.io.hfile.bucket.BucketEntry; +import org.apache.hadoop.hbase.regionserver.StoreFileWriter; +import org.apache.hadoop.hbase.testclassification.IOTests; +import org.apache.hadoop.hbase.testclassification.MediumTests; +import org.junit.After; +import org.junit.Before; +import org.junit.ClassRule; +import org.junit.Rule; +import org.junit.Test; +import org.junit.experimental.categories.Category; +import org.junit.rules.TestName; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import org.apache.hbase.thirdparty.com.google.common.collect.ImmutableMap; + +@Category({ IOTests.class, MediumTests.class }) +public class TestPrefetchWithBucketCache { + + private static final Logger LOG = LoggerFactory.getLogger(TestPrefetchWithBucketCache.class); + + @ClassRule + public static final HBaseClassTestRule CLASS_RULE = + HBaseClassTestRule.forClass(TestPrefetchWithBucketCache.class); + + @Rule + public TestName name = new TestName(); + + private static final HBaseTestingUtility TEST_UTIL = new HBaseTestingUtility(); + + private static final int NUM_VALID_KEY_TYPES = KeyValue.Type.values().length - 2; + private static final int DATA_BLOCK_SIZE = 2048; + private static final int NUM_KV = 100; + + private Configuration conf; + private CacheConfig cacheConf; + private FileSystem fs; + private BlockCache blockCache; + + @Before + public void setUp() throws IOException { + conf = TEST_UTIL.getConfiguration(); + conf.setBoolean(CacheConfig.PREFETCH_BLOCKS_ON_OPEN_KEY, true); + fs = HFileSystem.get(conf); + File testDir = new File(name.getMethodName()); + testDir.mkdir(); + conf.set(BUCKET_CACHE_IOENGINE_KEY, "file:/" + testDir.getAbsolutePath() + "/bucket.cache"); + conf.setLong(BUCKET_CACHE_SIZE_KEY, 200); + blockCache = BlockCacheFactory.createBlockCache(conf); + cacheConf = new CacheConfig(conf, blockCache); + } + + @After + public void tearDown() { + File cacheFile = new File(name.getMethodName() + "/bucket.cache"); + File dir = new File(name.getMethodName()); + cacheFile.delete(); + dir.delete(); + } + + @Test + public void testPrefetchDoesntOverwork() throws Exception { + Path storeFile = writeStoreFile("TestPrefetchDoesntOverwork"); + // Prefetches the file blocks + LOG.debug("First read should prefetch the blocks."); + readStoreFile(storeFile); + BucketCache bc = BucketCache.getBuckedCacheFromCacheConfig(cacheConf).get(); + // Our file should have 6 DATA blocks. We should wait for all of them to be cached + Waiter.waitFor(conf, 300, () -> bc.getBackingMap().size() == 6); + Map<BlockCacheKey, BucketEntry> snapshot = ImmutableMap.copyOf(bc.getBackingMap()); + // Reads file again and check we are not prefetching it again + LOG.debug("Second read, no prefetch should happen here."); + readStoreFile(storeFile); + // Makes sure the cache hasn't changed + snapshot.entrySet().forEach(e -> { + BucketEntry entry = bc.getBackingMap().get(e.getKey()); + assertNotNull(entry); + assertEquals(e.getValue().getCachedTime(), entry.getCachedTime()); + }); + // forcibly removes first block from the bc backing map, in order to cause it to be cached again + BlockCacheKey key = snapshot.keySet().stream().findFirst().get(); + LOG.debug("removing block {}", key); + bc.getBackingMap().remove(key); + bc.getFullyCachedFiles().remove(storeFile.getName()); + assertTrue(snapshot.size() > bc.getBackingMap().size()); + LOG.debug("Third read should prefetch again, as we removed one block for the file."); + readStoreFile(storeFile); + Waiter.waitFor(conf, 300, () -> snapshot.size() == bc.getBackingMap().size()); + assertTrue(snapshot.get(key).getCachedTime() < bc.getBackingMap().get(key).getCachedTime()); + } + + private void readStoreFile(Path storeFilePath) throws Exception { + readStoreFile(storeFilePath, (r, o) -> { + HFileBlock block = null; + try { + block = r.readBlock(o, -1, false, true, false, true, null, null); + } catch (IOException e) { + fail(e.getMessage()); + } + return block; + }, (key, block) -> { + boolean isCached = blockCache.getBlock(key, true, false, true) != null; + if ( + block.getBlockType() == BlockType.DATA || block.getBlockType() == BlockType.ROOT_INDEX + || block.getBlockType() == BlockType.INTERMEDIATE_INDEX + ) { + assertTrue(isCached); + } + }); + } + + private void readStoreFile(Path storeFilePath, + BiFunction<HFile.Reader, Long, HFileBlock> readFunction, + BiConsumer<BlockCacheKey, HFileBlock> validationFunction) throws Exception { + // Open the file + HFile.Reader reader = HFile.createReader(fs, storeFilePath, cacheConf, true, conf); + + while (!reader.prefetchComplete()) { + // Sleep for a bit + Thread.sleep(1000); + } + long offset = 0; + while (offset < reader.getTrailer().getLoadOnOpenDataOffset()) { + HFileBlock block = readFunction.apply(reader, offset); + BlockCacheKey blockCacheKey = new BlockCacheKey(reader.getName(), offset); + validationFunction.accept(blockCacheKey, block); + offset += block.getOnDiskSizeWithHeader(); + } + } + + private Path writeStoreFile(String fname) throws IOException { + HFileContext meta = new HFileContextBuilder().withBlockSize(DATA_BLOCK_SIZE).build(); + return writeStoreFile(fname, meta); + } + + private Path writeStoreFile(String fname, HFileContext context) throws IOException { + Path storeFileParentDir = new Path(TEST_UTIL.getDataTestDir(), fname); + StoreFileWriter sfw = new StoreFileWriter.Builder(conf, cacheConf, fs) + .withOutputDir(storeFileParentDir).withFileContext(context).build(); + Random rand = ThreadLocalRandom.current(); + final int rowLen = 32; + for (int i = 0; i < NUM_KV; ++i) { + byte[] k = RandomKeyValueUtil.randomOrderedKey(rand, i); + byte[] v = RandomKeyValueUtil.randomValue(rand); + int cfLen = rand.nextInt(k.length - rowLen + 1); + KeyValue kv = new KeyValue(k, 0, rowLen, k, rowLen, cfLen, k, rowLen + cfLen, + k.length - rowLen - cfLen, rand.nextLong(), generateKeyType(rand), v, 0, v.length); + sfw.append(kv); + } + + sfw.close(); + return sfw.getPath(); + } + + public static KeyValue.Type generateKeyType(Random rand) { + if (rand.nextBoolean()) { + // Let's make half of KVs puts. + return KeyValue.Type.Put; + } else { + KeyValue.Type keyType = KeyValue.Type.values()[1 + rand.nextInt(NUM_VALID_KEY_TYPES)]; + if (keyType == KeyValue.Type.Minimum || keyType == KeyValue.Type.Maximum) { + throw new RuntimeException("Generated an invalid key type: " + keyType + ". " + + "Probably the layout of KeyValue.Type has changed."); + } + return keyType; + } + } + +} diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/io/hfile/bucket/TestBucketCache.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/io/hfile/bucket/TestBucketCache.java index bbc15c555b0..075f10d08c0 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/io/hfile/bucket/TestBucketCache.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/io/hfile/bucket/TestBucketCache.java @@ -57,6 +57,7 @@ import org.apache.hadoop.hbase.io.hfile.bucket.BucketCache.RAMQueueEntry; import org.apache.hadoop.hbase.nio.ByteBuff; import org.apache.hadoop.hbase.testclassification.IOTests; import org.apache.hadoop.hbase.testclassification.LargeTests; +import org.apache.hadoop.hbase.util.EnvironmentEdgeManager; import org.apache.hadoop.hbase.util.Pair; import org.apache.hadoop.hbase.util.Threads; import org.junit.After; @@ -110,8 +111,7 @@ public class TestBucketCache { final long capacitySize = 32 * 1024 * 1024; final int writeThreads = BucketCache.DEFAULT_WRITER_THREADS; final int writerQLen = BucketCache.DEFAULT_WRITER_QUEUE_ITEMS; - String ioEngineName = "offheap"; - String persistencePath = null; + private String ioEngineName = "offheap"; private static final HBaseTestingUtility HBASE_TESTING_UTILITY = new HBaseTestingUtility(); @@ -137,7 +137,7 @@ public class TestBucketCache { @Before public void setup() throws IOException { cache = new MockedBucketCache(ioEngineName, capacitySize, constructedBlockSize, - constructedBlockSizes, writeThreads, writerQLen, persistencePath); + constructedBlockSizes, writeThreads, writerQLen, null); } @After @@ -294,51 +294,149 @@ public class TestBucketCache { @Test public void testRetrieveFromFile() throws Exception { - HBaseTestingUtility TEST_UTIL = new HBaseTestingUtility(); - Path testDir = TEST_UTIL.getDataTestDir(); - TEST_UTIL.getTestFileSystem().mkdirs(testDir); - + Path testDir = createAndGetTestDir(); String ioEngineName = "file:" + testDir + "/bucket.cache"; + testRetrievalUtils(testDir, ioEngineName); + int[] smallBucketSizes = new int[] { 3 * 1024, 5 * 1024 }; String persistencePath = testDir + "/bucket.persistence"; + BucketCache bucketCache = null; + try { + bucketCache = new BucketCache(ioEngineName, capacitySize, constructedBlockSize, + smallBucketSizes, writeThreads, writerQLen, persistencePath); + assertFalse(new File(persistencePath).exists()); + assertEquals(0, bucketCache.getAllocator().getUsedSize()); + assertEquals(0, bucketCache.backingMap.size()); + } finally { + bucketCache.shutdown(); + HBASE_TESTING_UTILITY.cleanupTestDir(); + } + } - BucketCache bucketCache = new BucketCache(ioEngineName, capacitySize, constructedBlockSize, - constructedBlockSizes, writeThreads, writerQLen, persistencePath); - long usedSize = bucketCache.getAllocator().getUsedSize(); - assertEquals(0, usedSize); - - HFileBlockPair[] blocks = CacheTestUtils.generateHFileBlocks(constructedBlockSize, 1); - // Add blocks - for (HFileBlockPair block : blocks) { - bucketCache.cacheBlock(block.getBlockName(), block.getBlock()); + @Test + public void testRetrieveFromMMap() throws Exception { + final Path testDir = createAndGetTestDir(); + final String ioEngineName = "mmap:" + testDir + "/bucket.cache"; + testRetrievalUtils(testDir, ioEngineName); + } + + @Test + public void testRetrieveFromPMem() throws Exception { + final Path testDir = createAndGetTestDir(); + final String ioEngineName = "pmem:" + testDir + "/bucket.cache"; + testRetrievalUtils(testDir, ioEngineName); + int[] smallBucketSizes = new int[] { 3 * 1024, 5 * 1024 }; + String persistencePath = testDir + "/bucket.persistence" + EnvironmentEdgeManager.currentTime(); + BucketCache bucketCache = null; + try { + bucketCache = new BucketCache(ioEngineName, capacitySize, constructedBlockSize, + smallBucketSizes, writeThreads, writerQLen, persistencePath); + assertFalse(new File(persistencePath).exists()); + assertEquals(0, bucketCache.getAllocator().getUsedSize()); + assertEquals(0, bucketCache.backingMap.size()); + } finally { + bucketCache.shutdown(); + HBASE_TESTING_UTILITY.cleanupTestDir(); } - for (HFileBlockPair block : blocks) { - cacheAndWaitUntilFlushedToBucket(bucketCache, block.getBlockName(), block.getBlock(), false); + } + + private void testRetrievalUtils(Path testDir, String ioEngineName) + throws IOException, InterruptedException { + final String persistencePath = + testDir + "/bucket.persistence" + EnvironmentEdgeManager.currentTime(); + BucketCache bucketCache = null; + try { + bucketCache = new BucketCache(ioEngineName, capacitySize, constructedBlockSize, + constructedBlockSizes, writeThreads, writerQLen, persistencePath); + long usedSize = bucketCache.getAllocator().getUsedSize(); + assertEquals(0, usedSize); + HFileBlockPair[] blocks = CacheTestUtils.generateHFileBlocks(constructedBlockSize, 1); + for (HFileBlockPair block : blocks) { + bucketCache.cacheBlock(block.getBlockName(), block.getBlock()); + } + for (HFileBlockPair block : blocks) { + cacheAndWaitUntilFlushedToBucket(bucketCache, block.getBlockName(), block.getBlock(), + false); + } + usedSize = bucketCache.getAllocator().getUsedSize(); + assertNotEquals(0, usedSize); + bucketCache.shutdown(); + assertTrue(new File(persistencePath).exists()); + bucketCache = new BucketCache(ioEngineName, capacitySize, constructedBlockSize, + constructedBlockSizes, writeThreads, writerQLen, persistencePath); + assertFalse(new File(persistencePath).exists()); + assertEquals(usedSize, bucketCache.getAllocator().getUsedSize()); + } finally { + if (bucketCache != null) { + bucketCache.shutdown(); + } } - usedSize = bucketCache.getAllocator().getUsedSize(); - assertNotEquals(0, usedSize); - // persist cache to file - bucketCache.shutdown(); assertTrue(new File(persistencePath).exists()); + } - // restore cache from file - bucketCache = new BucketCache(ioEngineName, capacitySize, constructedBlockSize, - constructedBlockSizes, writeThreads, writerQLen, persistencePath); - assertFalse(new File(persistencePath).exists()); - assertEquals(usedSize, bucketCache.getAllocator().getUsedSize()); - // persist cache to file - bucketCache.shutdown(); - assertTrue(new File(persistencePath).exists()); + @Test + public void testRetrieveUnsupportedIOE() throws Exception { + try { + final Path testDir = createAndGetTestDir(); + final String ioEngineName = testDir + "/bucket.cache"; + testRetrievalUtils(testDir, ioEngineName); + Assert.fail("Should have thrown IllegalArgumentException because of unsupported IOEngine!!"); + } catch (IllegalArgumentException e) { + Assert.assertEquals("Don't understand io engine name for cache- prefix with file:, " + + "files:, mmap: or offheap", e.getMessage()); + } + } - // reconfig buckets sizes, the biggest bucket is small than constructedBlockSize (8k or 16k) - // so it can't restore cache from file - int[] smallBucketSizes = new int[] { 2 * 1024 + 1024, 4 * 1024 + 1024 }; - bucketCache = new BucketCache(ioEngineName, capacitySize, constructedBlockSize, - smallBucketSizes, writeThreads, writerQLen, persistencePath); - assertFalse(new File(persistencePath).exists()); - assertEquals(0, bucketCache.getAllocator().getUsedSize()); - assertEquals(0, bucketCache.backingMap.size()); + @Test + public void testRetrieveFromMultipleFiles() throws Exception { + final Path testDirInitial = createAndGetTestDir(); + final Path newTestDir = new HBaseTestingUtility().getDataTestDir(); + HBASE_TESTING_UTILITY.getTestFileSystem().mkdirs(newTestDir); + String ioEngineName = + new StringBuilder("files:").append(testDirInitial).append("/bucket1.cache") + .append(FileIOEngine.FILE_DELIMITER).append(newTestDir).append("/bucket2.cache").toString(); + testRetrievalUtils(testDirInitial, ioEngineName); + int[] smallBucketSizes = new int[] { 3 * 1024, 5 * 1024 }; + String persistencePath = testDirInitial + "/bucket.persistence"; + BucketCache bucketCache = null; + try { + bucketCache = new BucketCache(ioEngineName, capacitySize, constructedBlockSize, + smallBucketSizes, writeThreads, writerQLen, persistencePath); + assertFalse(new File(persistencePath).exists()); + assertEquals(0, bucketCache.getAllocator().getUsedSize()); + assertEquals(0, bucketCache.backingMap.size()); + } finally { + bucketCache.shutdown(); + HBASE_TESTING_UTILITY.cleanupTestDir(); + } + } - TEST_UTIL.cleanupTestDir(); + @Test + public void testRetrieveFromFileWithoutPersistence() throws Exception { + BucketCache bucketCache = new BucketCache(ioEngineName, capacitySize, constructedBlockSize, + constructedBlockSizes, writeThreads, writerQLen, null); + try { + final Path testDir = createAndGetTestDir(); + String ioEngineName = "file:" + testDir + "/bucket.cache"; + long usedSize = bucketCache.getAllocator().getUsedSize(); + assertEquals(0, usedSize); + HFileBlockPair[] blocks = CacheTestUtils.generateHFileBlocks(constructedBlockSize, 1); + for (HFileBlockPair block : blocks) { + bucketCache.cacheBlock(block.getBlockName(), block.getBlock()); + } + for (HFileBlockPair block : blocks) { + cacheAndWaitUntilFlushedToBucket(bucketCache, block.getBlockName(), block.getBlock(), + false); + } + usedSize = bucketCache.getAllocator().getUsedSize(); + assertNotEquals(0, usedSize); + bucketCache.shutdown(); + bucketCache = new BucketCache(ioEngineName, capacitySize, constructedBlockSize, + constructedBlockSizes, writeThreads, writerQLen, null); + assertEquals(0, bucketCache.getAllocator().getUsedSize()); + } finally { + bucketCache.shutdown(); + HBASE_TESTING_UTILITY.cleanupTestDir(); + } } @Test @@ -362,13 +460,32 @@ public class TestBucketCache { conf.setFloat(BucketCache.MEMORY_FACTOR_CONFIG_NAME, 0.2f); BucketCache cache = new BucketCache(ioEngineName, capacitySize, constructedBlockSize, - constructedBlockSizes, writeThreads, writerQLen, persistencePath, 100, conf); + constructedBlockSizes, writeThreads, writerQLen, null, 100, conf); validateGetPartitionSize(cache, 0.1f, 0.5f); validateGetPartitionSize(cache, 0.7f, 0.5f); validateGetPartitionSize(cache, 0.2f, 0.5f); } + @Test + public void testCacheSizeCapacity() throws IOException { + // Test cache capacity (capacity / blockSize) < Integer.MAX_VALUE + validateGetPartitionSize(cache, BucketCache.DEFAULT_SINGLE_FACTOR, + BucketCache.DEFAULT_MIN_FACTOR); + Configuration conf = HBaseConfiguration.create(); + conf.setFloat(BucketCache.MIN_FACTOR_CONFIG_NAME, 0.5f); + conf.setFloat(BucketCache.SINGLE_FACTOR_CONFIG_NAME, 0.1f); + conf.setFloat(BucketCache.MULTI_FACTOR_CONFIG_NAME, 0.7f); + conf.setFloat(BucketCache.MEMORY_FACTOR_CONFIG_NAME, 0.2f); + try { + new BucketCache(ioEngineName, Long.MAX_VALUE, 1, constructedBlockSizes, writeThreads, + writerQLen, null, 100, conf); + Assert.fail("Should have thrown IllegalArgumentException because of large cache capacity!"); + } catch (IllegalArgumentException e) { + Assert.assertEquals("Cache capacity is too large, only support 32TB now", e.getMessage()); + } + } + @Test public void testValidBucketCacheConfigs() throws IOException { Configuration conf = HBaseConfiguration.create(); @@ -380,7 +497,7 @@ public class TestBucketCache { conf.setFloat(BucketCache.MEMORY_FACTOR_CONFIG_NAME, 0.2f); BucketCache cache = new BucketCache(ioEngineName, capacitySize, constructedBlockSize, - constructedBlockSizes, writeThreads, writerQLen, persistencePath, 100, conf); + constructedBlockSizes, writeThreads, writerQLen, null, 100, conf); assertEquals(BucketCache.ACCEPT_FACTOR_CONFIG_NAME + " failed to propagate.", 0.9f, cache.getAcceptableFactor(), 0); @@ -453,7 +570,7 @@ public class TestBucketCache { conf.setFloat(configName, configMap.get(configName)[i]); } BucketCache cache = new BucketCache(ioEngineName, capacitySize, constructedBlockSize, - constructedBlockSizes, writeThreads, writerQLen, persistencePath, 100, conf); + constructedBlockSizes, writeThreads, writerQLen, null, 100, conf); assertTrue("Created BucketCache and expected it to succeed: " + expectSuccess[i] + ", but it actually was: " + !expectSuccess[i], expectSuccess[i]); } catch (IllegalArgumentException e) { @@ -475,7 +592,7 @@ public class TestBucketCache { // This number is picked because it produces negative output if the values isn't ensured to be // positive. See HBASE-18757 for more information. long testValue = 549888460800L; - BucketEntry bucketEntry = new BucketEntry(testValue, 10, 10L, true, (entry) -> { + BucketEntry bucketEntry = new BucketEntry(testValue, 10, 10, 10L, true, (entry) -> { return ByteBuffAllocator.NONE; }, ByteBuffAllocator.HEAP); assertEquals(testValue, bucketEntry.offset()); @@ -604,8 +721,8 @@ public class TestBucketCache { HFileBlock.FILL_HEADER, -1, 52, -1, meta, ByteBuffAllocator.HEAP); HFileBlock blk2 = new HFileBlock(BlockType.DATA, size, size, -1, ByteBuff.wrap(buf), HFileBlock.FILL_HEADER, -1, -1, -1, meta, ByteBuffAllocator.HEAP); - RAMQueueEntry re1 = new RAMQueueEntry(key1, blk1, 1, false); - RAMQueueEntry re2 = new RAMQueueEntry(key1, blk2, 1, false); + RAMQueueEntry re1 = new RAMQueueEntry(key1, blk1, 1, false, false); + RAMQueueEntry re2 = new RAMQueueEntry(key1, blk2, 1, false, false); assertFalse(cache.containsKey(key1)); assertNull(cache.putIfAbsent(key1, re1)); @@ -652,7 +769,7 @@ public class TestBucketCache { BucketAllocator allocator = new BucketAllocator(availableSpace, null); BlockCacheKey key = new BlockCacheKey("dummy", 1L); - RAMQueueEntry re = new RAMQueueEntry(key, block, 1, true); + RAMQueueEntry re = new RAMQueueEntry(key, block, 1, true, false); Assert.assertEquals(0, allocator.getUsedSize()); try { @@ -671,13 +788,14 @@ public class TestBucketCache { */ @Test public void testFreeBucketEntryRestoredFromFile() throws Exception { + BucketCache bucketCache = null; try { final Path dataTestDir = createAndGetTestDir(); String ioEngineName = "file:" + dataTestDir + "/bucketNoRecycler.cache"; String persistencePath = dataTestDir + "/bucketNoRecycler.persistence"; - BucketCache bucketCache = new BucketCache(ioEngineName, capacitySize, constructedBlockSize, + bucketCache = new BucketCache(ioEngineName, capacitySize, constructedBlockSize, constructedBlockSizes, writeThreads, writerQLen, persistencePath); long usedByteSize = bucketCache.getAllocator().getUsedSize(); assertEquals(0, usedByteSize); @@ -712,19 +830,21 @@ public class TestBucketCache { assertEquals(0, bucketCache.getAllocator().getUsedSize()); assertEquals(0, bucketCache.backingMap.size()); } finally { + bucketCache.shutdown(); HBASE_TESTING_UTILITY.cleanupTestDir(); } } @Test public void testBlockAdditionWaitWhenCache() throws Exception { + BucketCache bucketCache = null; try { final Path dataTestDir = createAndGetTestDir(); String ioEngineName = "file:" + dataTestDir + "/bucketNoRecycler.cache"; String persistencePath = dataTestDir + "/bucketNoRecycler.persistence"; - BucketCache bucketCache = new BucketCache(ioEngineName, capacitySize, constructedBlockSize, + bucketCache = new BucketCache(ioEngineName, capacitySize, constructedBlockSize, constructedBlockSizes, 1, 1, persistencePath); long usedByteSize = bucketCache.getAllocator().getUsedSize(); assertEquals(0, usedByteSize); @@ -767,6 +887,9 @@ public class TestBucketCache { assertEquals(0, bucketCache.getAllocator().getUsedSize()); assertEquals(0, bucketCache.backingMap.size()); } finally { + if (bucketCache != null) { + bucketCache.shutdown(); + } HBASE_TESTING_UTILITY.cleanupTestDir(); } } diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/io/hfile/bucket/TestBucketCachePersister.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/io/hfile/bucket/TestBucketCachePersister.java index 2fbcf2850e3..f5abbb1bf67 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/io/hfile/bucket/TestBucketCachePersister.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/io/hfile/bucket/TestBucketCachePersister.java @@ -38,7 +38,6 @@ import org.apache.hadoop.hbase.io.hfile.HFile; import org.apache.hadoop.hbase.io.hfile.HFileBlock; import org.apache.hadoop.hbase.io.hfile.HFileContext; import org.apache.hadoop.hbase.io.hfile.HFileContextBuilder; -import org.apache.hadoop.hbase.io.hfile.PrefetchExecutor; import org.apache.hadoop.hbase.io.hfile.RandomKeyValueUtil; import org.apache.hadoop.hbase.regionserver.StoreFileWriter; import org.apache.hadoop.hbase.testclassification.IOTests; @@ -85,7 +84,6 @@ public class TestBucketCachePersister { } public BucketCache setupBucketCache(Configuration conf) throws IOException { - conf.set(CacheConfig.PREFETCH_PERSISTENCE_PATH_KEY, (testDir + "/prefetch.persistence")); BucketCache bucketCache = new BucketCache("file:" + testDir + "/bucket.cache", capacitySize, constructedBlockSize, constructedBlockSizes, writeThreads, writerQLen, testDir + "/bucket.persistence", 60 * 1000, conf); @@ -111,9 +109,7 @@ public class TestBucketCachePersister { readStoreFile(storeFile, 0, fs, cacheConf, conf, bucketCache); readStoreFile(storeFile2, 0, fs, cacheConf, conf, bucketCache); Thread.sleep(bucketCachePersistInterval); - assertTrue(new File(testDir + "/prefetch.persistence").exists()); assertTrue(new File(testDir + "/bucket.persistence").exists()); - assertTrue(new File(testDir + "/prefetch.persistence").delete()); assertTrue(new File(testDir + "/bucket.persistence").delete()); cleanupBucketCache(bucketCache); } @@ -128,7 +124,6 @@ public class TestBucketCachePersister { // Load Cache Path storeFile = writeStoreFile("TestPrefetch2", conf, cacheConf, fs); readStoreFile(storeFile, 0, fs, cacheConf, conf, bucketCache); - assertFalse(new File(testDir + "/prefetch.persistence").exists()); assertFalse(new File(testDir + "/bucket.persistence").exists()); cleanupBucketCache(bucketCache); } @@ -144,10 +139,10 @@ public class TestBucketCachePersister { readStoreFile(storeFile, 0, fs, cacheConf, conf, bucketCache1); Thread.sleep(500); // Evict Blocks from cache + assertTrue(bucketCache1.fullyCachedFiles.containsKey(storeFile.getName())); BlockCacheKey bucketCacheKey = bucketCache1.backingMap.entrySet().iterator().next().getKey(); - assertTrue(PrefetchExecutor.isFilePrefetched(storeFile.getName())); bucketCache1.evictBlock(bucketCacheKey); - assertFalse(PrefetchExecutor.isFilePrefetched(storeFile.getName())); + assertFalse(bucketCache1.fullyCachedFiles.containsKey(storeFile.getName())); } public void readStoreFile(Path storeFilePath, long offset, FileSystem fs, CacheConfig cacheConf, diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/io/hfile/bucket/TestByteBufferIOEngine.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/io/hfile/bucket/TestByteBufferIOEngine.java index 820e91aa6e8..b42e7be804d 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/io/hfile/bucket/TestByteBufferIOEngine.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/io/hfile/bucket/TestByteBufferIOEngine.java @@ -49,7 +49,7 @@ public class TestByteBufferIOEngine { private long off; MockBucketEntry(long offset, int length, ByteBuffAllocator allocator) { - super(offset & 0xFF00, length, 0, false, (entry) -> { + super(offset & 0xFF00, length, length, 0, false, (entry) -> { return ByteBuffAllocator.NONE; }, allocator); this.off = offset; diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/io/hfile/bucket/TestPrefetchPersistence.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/io/hfile/bucket/TestPrefetchPersistence.java index 900340e57ad..d4f0d89bbd2 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/io/hfile/bucket/TestPrefetchPersistence.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/io/hfile/bucket/TestPrefetchPersistence.java @@ -41,7 +41,6 @@ import org.apache.hadoop.hbase.io.hfile.HFile; import org.apache.hadoop.hbase.io.hfile.HFileBlock; import org.apache.hadoop.hbase.io.hfile.HFileContext; import org.apache.hadoop.hbase.io.hfile.HFileContextBuilder; -import org.apache.hadoop.hbase.io.hfile.PrefetchExecutor; import org.apache.hadoop.hbase.io.hfile.RandomKeyValueUtil; import org.apache.hadoop.hbase.regionserver.StoreFileWriter; import org.apache.hadoop.hbase.testclassification.IOTests; @@ -106,8 +105,6 @@ public class TestPrefetchPersistence { conf.setBoolean(CacheConfig.PREFETCH_BLOCKS_ON_OPEN_KEY, true); testDir = TEST_UTIL.getDataTestDir(); TEST_UTIL.getTestFileSystem().mkdirs(testDir); - prefetchPersistencePath = testDir + "/prefetch.persistence"; - conf.set(CacheConfig.PREFETCH_PERSISTENCE_PATH_KEY, prefetchPersistencePath); fs = HFileSystem.get(conf); } @@ -132,10 +129,10 @@ public class TestPrefetchPersistence { bucketCache.shutdown(); assertTrue(new File(testDir + "/bucket.persistence").exists()); - assertTrue(new File(testDir + "/prefetch.persistence").exists()); bucketCache = new BucketCache("file:" + testDir + "/bucket.cache", capacitySize, constructedBlockSize, constructedBlockSizes, writeThreads, writerQLen, testDir + "/bucket.persistence", 60 * 1000, conf); + cacheConf = new CacheConfig(conf, bucketCache); assertFalse(new File(testDir + "/bucket.persistence").exists()); assertFalse(new File(testDir + "/prefetch.persistence").exists()); assertTrue(usedSize != 0); @@ -148,9 +145,9 @@ public class TestPrefetchPersistence { public void closeStoreFile(Path path) throws Exception { HFile.Reader reader = HFile.createReader(fs, path, cacheConf, true, conf); - assertTrue(PrefetchExecutor.isFilePrefetched(path.getName())); + assertTrue(bucketCache.fullyCachedFiles.containsKey(path.getName())); reader.close(true); - assertFalse(PrefetchExecutor.isFilePrefetched(path.getName())); + assertFalse(bucketCache.fullyCachedFiles.containsKey(path.getName())); } public void readStoreFile(Path storeFilePath, long offset) throws Exception { diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/io/hfile/bucket/TestRAMCache.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/io/hfile/bucket/TestRAMCache.java index 0e777a4a7b9..58d9385f57e 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/io/hfile/bucket/TestRAMCache.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/io/hfile/bucket/TestRAMCache.java @@ -90,7 +90,7 @@ public class TestRAMCache { MockHFileBlock blk = new MockHFileBlock(BlockType.DATA, size, size, -1, ByteBuffer.wrap(byteArr, 0, size), HFileBlock.FILL_HEADER, -1, 52, -1, new HFileContextBuilder().build(), ByteBuffAllocator.HEAP); - RAMQueueEntry re = new RAMQueueEntry(key, blk, 1, false); + RAMQueueEntry re = new RAMQueueEntry(key, blk, 1, false, false); Assert.assertNull(cache.putIfAbsent(key, re)); Assert.assertEquals(cache.putIfAbsent(key, re), re); diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/io/hfile/bucket/TestVerifyBucketCacheFile.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/io/hfile/bucket/TestVerifyBucketCacheFile.java index 894e12b5f5a..f0d61293082 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/io/hfile/bucket/TestVerifyBucketCacheFile.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/io/hfile/bucket/TestVerifyBucketCacheFile.java @@ -17,11 +17,15 @@ */ package org.apache.hadoop.hbase.io.hfile.bucket; +import static org.apache.hadoop.hbase.io.hfile.CacheConfig.BUCKETCACHE_PERSIST_INTERVAL_KEY; +import static org.apache.hadoop.hbase.io.hfile.bucket.BucketCache.DEFAULT_ERROR_TOLERATION_DURATION; import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertNotEquals; +import static org.junit.Assert.assertNull; import static org.junit.Assert.assertTrue; import java.io.BufferedWriter; +import java.io.File; import java.io.FileOutputStream; import java.io.OutputStreamWriter; import java.nio.file.FileSystems; @@ -33,11 +37,14 @@ import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.Path; import org.apache.hadoop.hbase.HBaseClassTestRule; import org.apache.hadoop.hbase.HBaseTestingUtility; +import org.apache.hadoop.hbase.HBaseConfiguration; import org.apache.hadoop.hbase.io.hfile.BlockCacheKey; import org.apache.hadoop.hbase.io.hfile.CacheConfig; import org.apache.hadoop.hbase.io.hfile.CacheTestUtils; import org.apache.hadoop.hbase.io.hfile.Cacheable; import org.apache.hadoop.hbase.testclassification.SmallTests; +import org.apache.hadoop.hbase.util.EnvironmentEdgeManager; +import org.apache.hadoop.hbase.util.Pair; import org.junit.ClassRule; import org.junit.Test; import org.junit.experimental.categories.Category; @@ -153,10 +160,10 @@ public class TestVerifyBucketCacheFile { TEST_UTIL.getTestFileSystem().mkdirs(testDir); Configuration conf = TEST_UTIL.getConfiguration(); conf.setLong(CacheConfig.BUCKETCACHE_PERSIST_INTERVAL_KEY, 300); - - BucketCache bucketCache = new BucketCache("file:" + testDir + "/bucket.cache", capacitySize, - constructedBlockSize, constructedBlockSizes, writeThreads, writerQLen, - testDir + "/bucket.persistence", 60 * 1000, conf); + String mapFileName = testDir + "/bucket.persistence" + EnvironmentEdgeManager.currentTime(); + BucketCache bucketCache = + new BucketCache("file:" + testDir + "/bucket.cache", capacitySize, constructedBlockSize, + constructedBlockSizes, writeThreads, writerQLen, mapFileName, 60 * 1000, conf); long usedSize = bucketCache.getAllocator().getUsedSize(); assertEquals(0, usedSize); @@ -171,14 +178,13 @@ public class TestVerifyBucketCacheFile { // Shutdown BucketCache bucketCache.shutdown(); // Delete the persistence file - final java.nio.file.Path mapFile = - FileSystems.getDefault().getPath(testDir.toString(), "bucket.persistence"); - assertTrue(Files.deleteIfExists(mapFile)); + File mapFile = new File(mapFileName); + assertTrue(mapFile.delete()); Thread.sleep(350); // Create BucketCache - bucketCache = new BucketCache("file:" + testDir + "/bucket.cache", capacitySize, - constructedBlockSize, constructedBlockSizes, writeThreads, writerQLen, - testDir + "/bucket.persistence", 60 * 1000, conf); + bucketCache = + new BucketCache("file:" + testDir + "/bucket.cache", capacitySize, constructedBlockSize, + constructedBlockSizes, writeThreads, writerQLen, mapFileName, 60 * 1000, conf); assertEquals(0, bucketCache.getAllocator().getUsedSize()); assertEquals(0, bucketCache.backingMap.size()); } @@ -232,9 +238,15 @@ public class TestVerifyBucketCacheFile { /** * Test whether BucketCache is started normally after modifying the cache file's last modified * time. First Start BucketCache and add some blocks, then shutdown BucketCache and persist cache - * to file. Then Restart BucketCache after modify cache file's last modified time, and it can't - * restore cache from file, the cache file and persistence file would be deleted before - * BucketCache start normally. + * to file. Then Restart BucketCache after modify cache file's last modified time. HBASE-XXXX has + * modified persistence cache such that now we store extra 8 bytes at the end of each block in the + * cache, representing the nanosecond time the block has been cached. So in the event the cache + * file has failed checksum verification during loading time, we go through all the cached blocks + * in the cache map and validate the cached time long between what is in the map and the cache + * file. If that check fails, we pull the cache key entry out of the map. Since in this test we + * are only modifying the access time to induce a checksum error, the cache file content is still + * valid and the extra verification should validate that all cache keys in the map are still + * recoverable from the cache. * @throws Exception the exception */ @Test @@ -249,6 +261,8 @@ public class TestVerifyBucketCacheFile { long usedSize = bucketCache.getAllocator().getUsedSize(); assertEquals(0, usedSize); + Pair<String, Long> myPair = new Pair<>(); + CacheTestUtils.HFileBlockPair[] blocks = CacheTestUtils.generateHFileBlocks(constructedBlockSize, 1); // Add blocks @@ -257,6 +271,8 @@ public class TestVerifyBucketCacheFile { } usedSize = bucketCache.getAllocator().getUsedSize(); assertNotEquals(0, usedSize); + long blockCount = bucketCache.backingMap.size(); + assertNotEquals(0, blockCount); // persist cache to file bucketCache.shutdown(); @@ -268,9 +284,64 @@ public class TestVerifyBucketCacheFile { bucketCache = new BucketCache("file:" + testDir + "/bucket.cache", capacitySize, constructedBlockSize, constructedBlockSizes, writeThreads, writerQLen, testDir + "/bucket.persistence"); - assertEquals(0, bucketCache.getAllocator().getUsedSize()); - assertEquals(0, bucketCache.backingMap.size()); + assertEquals(usedSize, bucketCache.getAllocator().getUsedSize()); + assertEquals(blockCount, bucketCache.backingMap.size()); + + TEST_UTIL.cleanupTestDir(); + } + + /** + * When using persistent bucket cache, there may be crashes between persisting the backing map and + * syncing new blocks to the cache file itself, leading to an inconsistent state between the cache + * keys and the cached data. This is to make sure the cache keys are updated accordingly, and the + * keys that are still valid do succeed in retrieve related block data from the cache without any + * corruption. + * @throws Exception the exception + */ + @Test + public void testBucketCacheRecovery() throws Exception { + HBaseTestingUtility TEST_UTIL = new HBaseTestingUtility(); + Path testDir = TEST_UTIL.getDataTestDir(); + TEST_UTIL.getTestFileSystem().mkdirs(testDir); + Configuration conf = HBaseConfiguration.create(); + // Disables the persister thread by setting its interval to MAX_VALUE + conf.setLong(BUCKETCACHE_PERSIST_INTERVAL_KEY, Long.MAX_VALUE); + String mapFileName = testDir + "/bucket.persistence" + EnvironmentEdgeManager.currentTime(); + BucketCache bucketCache = new BucketCache("file:" + testDir + "/bucket.cache", capacitySize, + constructedBlockSize, constructedBlockSizes, writeThreads, writerQLen, mapFileName, + DEFAULT_ERROR_TOLERATION_DURATION, conf); + + CacheTestUtils.HFileBlockPair[] blocks = + CacheTestUtils.generateHFileBlocks(constructedBlockSize, 4); + // Add three blocks + cacheAndWaitUntilFlushedToBucket(bucketCache, blocks[0].getBlockName(), blocks[0].getBlock()); + cacheAndWaitUntilFlushedToBucket(bucketCache, blocks[1].getBlockName(), blocks[1].getBlock()); + cacheAndWaitUntilFlushedToBucket(bucketCache, blocks[2].getBlockName(), blocks[2].getBlock()); + // saves the current state + bucketCache.persistToFile(); + // evicts first block + bucketCache.evictBlock(blocks[0].getBlockName()); + + // now adds a fourth block to bucket cache + cacheAndWaitUntilFlushedToBucket(bucketCache, blocks[3].getBlockName(), blocks[3].getBlock()); + // Creates new bucket cache instance without persisting to file after evicting first block + // and caching fourth block. So the bucket cache file has only the last three blocks, + // but backing map (containing cache keys) was persisted when first three blocks + // were in the cache. So the state on this recovery is: + // - Backing map: [block0, block1, block2] + // - Cache: [block1, block2, block3] + // Therefore, this bucket cache would be able to recover only block1 and block2. + BucketCache newBucketCache = new BucketCache("file:" + testDir + "/bucket.cache", capacitySize, + constructedBlockSize, constructedBlockSizes, writeThreads, writerQLen, mapFileName, + DEFAULT_ERROR_TOLERATION_DURATION, conf); + assertNull(newBucketCache.getBlock(blocks[0].getBlockName(), false, false, false)); + assertEquals(blocks[1].getBlock(), + newBucketCache.getBlock(blocks[1].getBlockName(), false, false, false)); + assertEquals(blocks[2].getBlock(), + newBucketCache.getBlock(blocks[2].getBlockName(), false, false, false)); + assertNull(newBucketCache.getBlock(blocks[3].getBlockName(), false, false, false)); + assertEquals(2, newBucketCache.backingMap.size()); TEST_UTIL.cleanupTestDir(); }