This is an automated email from the ASF dual-hosted git repository. wchevreuil pushed a commit to branch HBASE-28463 in repository https://gitbox.apache.org/repos/asf/hbase.git
The following commit(s) were added to refs/heads/HBASE-28463 by this push: new bfef12af56d HBASE-28468: Integrate the data-tiering logic into cache evictions. (#5829) bfef12af56d is described below commit bfef12af56d6a5091dad776c5c3b4daa73172b2a Author: jhungund <106576553+jhung...@users.noreply.github.com> AuthorDate: Thu Apr 25 15:29:36 2024 +0530 HBASE-28468: Integrate the data-tiering logic into cache evictions. (#5829) Signed-off-by: Wellington Chevreuil <wchevre...@apache.org> --- .../hadoop/hbase/io/hfile/bucket/BucketCache.java | 45 +++++- .../hbase/regionserver/DataTieringManager.java | 42 ++++- .../apache/hadoop/hbase/io/hfile/TestPrefetch.java | 4 +- .../hbase/regionserver/TestDataTieringManager.java | 178 +++++++++++++++++++++ 4 files changed, 263 insertions(+), 6 deletions(-) diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/bucket/BucketCache.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/bucket/BucketCache.java index 622a57f91c2..5a9c7795a33 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/bucket/BucketCache.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/bucket/BucketCache.java @@ -974,6 +974,7 @@ public class BucketCache implements BlockCache, HeapSize { long bytesToFreeWithExtra = (long) Math.floor(bytesToFreeWithoutExtra * (1 + extraFreeFactor)); + long bytesFreed = 0; // Instantiate priority buckets BucketEntryGroup bucketSingle = new BucketEntryGroup(bytesToFreeWithExtra, blockSize, getPartitionSize(singleFactor)); @@ -982,9 +983,36 @@ public class BucketCache implements BlockCache, HeapSize { BucketEntryGroup bucketMemory = new BucketEntryGroup(bytesToFreeWithExtra, blockSize, getPartitionSize(memoryFactor)); + // Check the list of files to determine the cold files which can be readily evicted. + Map<String, String> coldFiles = null; + try { + DataTieringManager dataTieringManager = DataTieringManager.getInstance(); + coldFiles = dataTieringManager.getColdFilesList(); + } catch (IllegalStateException e) { + LOG.warn("Data Tiering Manager is not set. Ignore time-based block evictions."); + } // Scan entire map putting bucket entry into appropriate bucket entry // group for (Map.Entry<BlockCacheKey, BucketEntry> bucketEntryWithKey : backingMap.entrySet()) { + if ( + coldFiles != null && coldFiles.containsKey(bucketEntryWithKey.getKey().getHfileName()) + ) { + int freedBlockSize = bucketEntryWithKey.getValue().getLength(); + if (evictBlockIfNoRpcReferenced(bucketEntryWithKey.getKey())) { + bytesFreed += freedBlockSize; + } + if (bytesFreed >= bytesToFreeWithExtra) { + if (LOG.isDebugEnabled()) { + LOG.debug( + "Bucket cache free space completed; required: {} freed: {} from cold data blocks.", + bytesToFreeWithExtra, StringUtils.byteDesc(bytesFreed)); + } + // Sufficient bytes have been freed. + return; + } + continue; + } + switch (bucketEntryWithKey.getValue().getPriority()) { case SINGLE: { bucketSingle.add(bucketEntryWithKey); @@ -1001,6 +1029,21 @@ public class BucketCache implements BlockCache, HeapSize { } } + // Check if the cold file eviction is sufficient to create enough space. + bytesToFreeWithExtra -= bytesFreed; + if (bytesToFreeWithExtra <= 0) { + LOG.debug("Bucket cache free space completed; freed space : {} bytes of cold data blocks.", + StringUtils.byteDesc(bytesFreed)); + return; + } + + if (LOG.isDebugEnabled()) { + LOG.debug( + "Bucket cache free space completed; freed space : {} " + + "bytes of cold data blocks. {} more bytes required to be freed.", + StringUtils.byteDesc(bytesFreed), bytesToFreeWithExtra); + } + PriorityQueue<BucketEntryGroup> bucketQueue = new PriorityQueue<>(3, Comparator.comparingLong(BucketEntryGroup::overflow)); @@ -1009,8 +1052,6 @@ public class BucketCache implements BlockCache, HeapSize { bucketQueue.add(bucketMemory); int remainingBuckets = bucketQueue.size(); - long bytesFreed = 0; - BucketEntryGroup bucketGroup; while ((bucketGroup = bucketQueue.poll()) != null) { long overflow = bucketGroup.overflow(); diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/DataTieringManager.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/DataTieringManager.java index dec96604774..6c699e77c2f 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/DataTieringManager.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/DataTieringManager.java @@ -20,6 +20,7 @@ package org.apache.hadoop.hbase.regionserver; import static org.apache.hadoop.hbase.regionserver.HStoreFile.TIMERANGE_KEY; import java.io.IOException; +import java.util.HashMap; import java.util.HashSet; import java.util.Map; import java.util.OptionalLong; @@ -173,12 +174,12 @@ public class DataTieringManager { private long getMaxTimestamp(Path hFilePath) throws DataTieringException { HStoreFile hStoreFile = getHStoreFile(hFilePath); if (hStoreFile == null) { - LOG.error("HStoreFile corresponding to " + hFilePath + " doesn't exist"); + LOG.error("HStoreFile corresponding to {} doesn't exist", hFilePath); return Long.MAX_VALUE; } OptionalLong maxTimestamp = hStoreFile.getMaximumTimestamp(); if (!maxTimestamp.isPresent()) { - LOG.error("Maximum timestamp not present for " + hFilePath); + LOG.error("Maximum timestamp not present for {}", hFilePath); return Long.MAX_VALUE; } return maxTimestamp.getAsLong(); @@ -270,4 +271,41 @@ public class DataTieringManager { return Long.parseLong( conf.get(DATATIERING_HOT_DATA_AGE_KEY, String.valueOf(DEFAULT_DATATIERING_HOT_DATA_AGE))); } + + /* + * This API traverses through the list of online regions and returns a subset of these files-names + * that are cold. + * @return List of names of files with cold data as per data-tiering logic. + */ + public Map<String, String> getColdFilesList() { + Map<String, String> coldFiles = new HashMap<>(); + for (HRegion r : this.onlineRegions.values()) { + for (HStore hStore : r.getStores()) { + Configuration conf = hStore.getReadOnlyConfiguration(); + if (getDataTieringType(conf) != DataTieringType.TIME_RANGE) { + // Data-Tiering not enabled for the store. Just skip it. + continue; + } + Long hotDataAge = getDataTieringHotDataAge(conf); + + for (HStoreFile hStoreFile : hStore.getStorefiles()) { + String hFileName = + hStoreFile.getFileInfo().getHFileInfo().getHFileContext().getHFileName(); + OptionalLong maxTimestamp = hStoreFile.getMaximumTimestamp(); + if (!maxTimestamp.isPresent()) { + LOG.warn("maxTimestamp missing for file: {}", + hStoreFile.getFileInfo().getActiveFileName()); + continue; + } + long currentTimestamp = EnvironmentEdgeManager.getDelegate().currentTime(); + long fileAge = currentTimestamp - maxTimestamp.getAsLong(); + if (fileAge > hotDataAge) { + // Values do not matter. + coldFiles.put(hFileName, null); + } + } + } + } + return coldFiles; + } } diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/io/hfile/TestPrefetch.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/io/hfile/TestPrefetch.java index 6083d872c82..5c1bf7bb913 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/io/hfile/TestPrefetch.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/io/hfile/TestPrefetch.java @@ -373,8 +373,8 @@ public class TestPrefetch { Thread.sleep(20000); assertFalse("Prefetch threads should not be running at this point", reader.prefetchStarted()); while (!reader.prefetchStarted()) { - assertTrue("Prefetch delay has not been expired yet", - getElapsedTime(startTime) < PrefetchExecutor.getPrefetchDelay()); + // Wait until the prefetch is triggered. + Thread.sleep(500); } if (reader.prefetchStarted()) { // Added some delay as we have started the timer a bit late. diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestDataTieringManager.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestDataTieringManager.java index 6beed9943b3..f758bb11c5b 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestDataTieringManager.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestDataTieringManager.java @@ -18,6 +18,7 @@ package org.apache.hadoop.hbase.regionserver; import static org.apache.hadoop.hbase.HConstants.BUCKET_CACHE_SIZE_KEY; +import static org.apache.hadoop.hbase.io.hfile.bucket.BucketCache.DEFAULT_ERROR_TOLERATION_DURATION; import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertTrue; import static org.junit.Assert.fail; @@ -51,7 +52,9 @@ import org.apache.hadoop.hbase.io.hfile.BlockCacheFactory; import org.apache.hadoop.hbase.io.hfile.BlockCacheKey; import org.apache.hadoop.hbase.io.hfile.BlockType; import org.apache.hadoop.hbase.io.hfile.CacheConfig; +import org.apache.hadoop.hbase.io.hfile.CacheTestUtils; import org.apache.hadoop.hbase.io.hfile.HFileContextBuilder; +import org.apache.hadoop.hbase.io.hfile.bucket.BucketCache; import org.apache.hadoop.hbase.testclassification.RegionServerTests; import org.apache.hadoop.hbase.testclassification.SmallTests; import org.apache.hadoop.hbase.util.Bytes; @@ -245,6 +248,181 @@ public class TestDataTieringManager { } } + @Test + public void testPickColdDataFiles() { + Map<String, String> coldDataFiles = dataTieringManager.getColdFilesList(); + assertEquals(1, coldDataFiles.size()); + // hStoreFiles[3] is the cold file. + assert (coldDataFiles.containsKey(hStoreFiles.get(3).getFileInfo().getActiveFileName())); + } + + /* + * Verify that two cold blocks(both) are evicted when bucket reaches its capacity. The hot file + * remains in the cache. + */ + @Test + public void testBlockEvictions() throws Exception { + long capacitySize = 40 * 1024; + int writeThreads = 3; + int writerQLen = 64; + int[] bucketSizes = new int[] { 8 * 1024 + 1024 }; + + // Setup: Create a bucket cache with lower capacity + BucketCache bucketCache = new BucketCache("file:" + testDir + "/bucket.cache", capacitySize, + 8192, bucketSizes, writeThreads, writerQLen, testDir + "/bucket.persistence", + DEFAULT_ERROR_TOLERATION_DURATION, defaultConf); + + // Create three Cache keys with cold data files and a block with hot data. + // hStoreFiles.get(3) is a cold data file, while hStoreFiles.get(0) is a hot file. + Set<BlockCacheKey> cacheKeys = new HashSet<>(); + cacheKeys.add(new BlockCacheKey(hStoreFiles.get(3).getPath(), 0, true, BlockType.DATA)); + cacheKeys.add(new BlockCacheKey(hStoreFiles.get(3).getPath(), 8192, true, BlockType.DATA)); + cacheKeys.add(new BlockCacheKey(hStoreFiles.get(0).getPath(), 0, true, BlockType.DATA)); + + // Create dummy data to be cached and fill the cache completely. + CacheTestUtils.HFileBlockPair[] blocks = CacheTestUtils.generateHFileBlocks(8192, 3); + + int blocksIter = 0; + for (BlockCacheKey key : cacheKeys) { + bucketCache.cacheBlock(key, blocks[blocksIter++].getBlock()); + // Ensure that the block is persisted to the file. + Waiter.waitFor(defaultConf, 10000, 100, () -> (bucketCache.getBackingMap().containsKey(key))); + } + + // Verify that the bucket cache contains 3 blocks. + assertEquals(3, bucketCache.getBackingMap().keySet().size()); + + // Add an additional block into cache with hot data which should trigger the eviction + BlockCacheKey newKey = new BlockCacheKey(hStoreFiles.get(2).getPath(), 0, true, BlockType.DATA); + CacheTestUtils.HFileBlockPair[] newBlock = CacheTestUtils.generateHFileBlocks(8192, 1); + + bucketCache.cacheBlock(newKey, newBlock[0].getBlock()); + Waiter.waitFor(defaultConf, 10000, 100, + () -> (bucketCache.getBackingMap().containsKey(newKey))); + + // Verify that the bucket cache now contains 2 hot blocks blocks only. + // Both cold blocks of 8KB will be evicted to make room for 1 block of 8KB + an additional + // space. + validateBlocks(bucketCache.getBackingMap().keySet(), 2, 2, 0); + } + + /* + * Verify that two cold blocks(both) are evicted when bucket reaches its capacity, but one cold + * block remains in the cache since the required space is freed. + */ + @Test + public void testBlockEvictionsAllColdBlocks() throws Exception { + long capacitySize = 40 * 1024; + int writeThreads = 3; + int writerQLen = 64; + int[] bucketSizes = new int[] { 8 * 1024 + 1024 }; + + // Setup: Create a bucket cache with lower capacity + BucketCache bucketCache = new BucketCache("file:" + testDir + "/bucket.cache", capacitySize, + 8192, bucketSizes, writeThreads, writerQLen, testDir + "/bucket.persistence", + DEFAULT_ERROR_TOLERATION_DURATION, defaultConf); + + // Create three Cache keys with three cold data blocks. + // hStoreFiles.get(3) is a cold data file. + Set<BlockCacheKey> cacheKeys = new HashSet<>(); + cacheKeys.add(new BlockCacheKey(hStoreFiles.get(3).getPath(), 0, true, BlockType.DATA)); + cacheKeys.add(new BlockCacheKey(hStoreFiles.get(3).getPath(), 8192, true, BlockType.DATA)); + cacheKeys.add(new BlockCacheKey(hStoreFiles.get(3).getPath(), 16384, true, BlockType.DATA)); + + // Create dummy data to be cached and fill the cache completely. + CacheTestUtils.HFileBlockPair[] blocks = CacheTestUtils.generateHFileBlocks(8192, 3); + + int blocksIter = 0; + for (BlockCacheKey key : cacheKeys) { + bucketCache.cacheBlock(key, blocks[blocksIter++].getBlock()); + // Ensure that the block is persisted to the file. + Waiter.waitFor(defaultConf, 10000, 100, () -> (bucketCache.getBackingMap().containsKey(key))); + } + + // Verify that the bucket cache contains 3 blocks. + assertEquals(3, bucketCache.getBackingMap().keySet().size()); + + // Add an additional block into cache with hot data which should trigger the eviction + BlockCacheKey newKey = new BlockCacheKey(hStoreFiles.get(2).getPath(), 0, true, BlockType.DATA); + CacheTestUtils.HFileBlockPair[] newBlock = CacheTestUtils.generateHFileBlocks(8192, 1); + + bucketCache.cacheBlock(newKey, newBlock[0].getBlock()); + Waiter.waitFor(defaultConf, 10000, 100, + () -> (bucketCache.getBackingMap().containsKey(newKey))); + + // Verify that the bucket cache now contains 1 cold block and a newly added hot block. + validateBlocks(bucketCache.getBackingMap().keySet(), 2, 1, 1); + } + + /* + * Verify that a hot block evicted along with a cold block when bucket reaches its capacity. + */ + @Test + public void testBlockEvictionsHotBlocks() throws Exception { + long capacitySize = 40 * 1024; + int writeThreads = 3; + int writerQLen = 64; + int[] bucketSizes = new int[] { 8 * 1024 + 1024 }; + + // Setup: Create a bucket cache with lower capacity + BucketCache bucketCache = new BucketCache("file:" + testDir + "/bucket.cache", capacitySize, + 8192, bucketSizes, writeThreads, writerQLen, testDir + "/bucket.persistence", + DEFAULT_ERROR_TOLERATION_DURATION, defaultConf); + + // Create three Cache keys with two hot data blocks and one cold data block + // hStoreFiles.get(0) is a hot data file and hStoreFiles.get(3) is a cold data file. + Set<BlockCacheKey> cacheKeys = new HashSet<>(); + cacheKeys.add(new BlockCacheKey(hStoreFiles.get(0).getPath(), 0, true, BlockType.DATA)); + cacheKeys.add(new BlockCacheKey(hStoreFiles.get(0).getPath(), 8192, true, BlockType.DATA)); + cacheKeys.add(new BlockCacheKey(hStoreFiles.get(3).getPath(), 0, true, BlockType.DATA)); + + // Create dummy data to be cached and fill the cache completely. + CacheTestUtils.HFileBlockPair[] blocks = CacheTestUtils.generateHFileBlocks(8192, 3); + + int blocksIter = 0; + for (BlockCacheKey key : cacheKeys) { + bucketCache.cacheBlock(key, blocks[blocksIter++].getBlock()); + // Ensure that the block is persisted to the file. + Waiter.waitFor(defaultConf, 10000, 100, () -> (bucketCache.getBackingMap().containsKey(key))); + } + + // Verify that the bucket cache contains 3 blocks. + assertEquals(3, bucketCache.getBackingMap().keySet().size()); + + // Add an additional block which should evict the only cold block with an additional hot block. + BlockCacheKey newKey = new BlockCacheKey(hStoreFiles.get(2).getPath(), 0, true, BlockType.DATA); + CacheTestUtils.HFileBlockPair[] newBlock = CacheTestUtils.generateHFileBlocks(8192, 1); + + bucketCache.cacheBlock(newKey, newBlock[0].getBlock()); + Waiter.waitFor(defaultConf, 10000, 100, + () -> (bucketCache.getBackingMap().containsKey(newKey))); + + // Verify that the bucket cache now contains 2 hot blocks. + // Only one of the older hot blocks is retained and other one is the newly added hot block. + validateBlocks(bucketCache.getBackingMap().keySet(), 2, 2, 0); + } + + private void validateBlocks(Set<BlockCacheKey> keys, int expectedTotalKeys, int expectedHotBlocks, + int expectedColdBlocks) { + int numHotBlocks = 0, numColdBlocks = 0; + + assertEquals(expectedTotalKeys, keys.size()); + int iter = 0; + for (BlockCacheKey key : keys) { + try { + if (dataTieringManager.isHotData(key)) { + numHotBlocks++; + } else { + numColdBlocks++; + } + } catch (Exception e) { + fail("Unexpected exception!"); + } + } + assertEquals(expectedHotBlocks, numHotBlocks); + assertEquals(expectedColdBlocks, numColdBlocks); + } + private void testDataTieringMethodWithPath(DataTieringMethodCallerWithPath caller, Path path, boolean expectedResult, DataTieringException exception) { try {