This is an automated email from the ASF dual-hosted git repository.

wchevreuil pushed a commit to branch HBASE-28463
in repository https://gitbox.apache.org/repos/asf/hbase.git

commit f890281bf9096b10ef87ad73a6206f51507b1fdc
Author: jhungund <106576553+jhung...@users.noreply.github.com>
AuthorDate: Thu Apr 25 15:29:36 2024 +0530

    HBASE-28468: Integrate the data-tiering logic into cache evictions. (#5829)
    
    Signed-off-by: Wellington Chevreuil <wchevre...@apache.org>
---
 .../hadoop/hbase/io/hfile/bucket/BucketCache.java  |  45 +++++-
 .../hbase/regionserver/DataTieringManager.java     |  42 ++++-
 .../apache/hadoop/hbase/io/hfile/TestPrefetch.java |   4 +-
 .../hbase/regionserver/TestDataTieringManager.java | 178 +++++++++++++++++++++
 4 files changed, 263 insertions(+), 6 deletions(-)

diff --git 
a/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/bucket/BucketCache.java
 
b/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/bucket/BucketCache.java
index 622a57f91c2..5a9c7795a33 100644
--- 
a/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/bucket/BucketCache.java
+++ 
b/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/bucket/BucketCache.java
@@ -974,6 +974,7 @@ public class BucketCache implements BlockCache, HeapSize {
       long bytesToFreeWithExtra =
         (long) Math.floor(bytesToFreeWithoutExtra * (1 + extraFreeFactor));
 
+      long bytesFreed = 0;
       // Instantiate priority buckets
       BucketEntryGroup bucketSingle =
         new BucketEntryGroup(bytesToFreeWithExtra, blockSize, 
getPartitionSize(singleFactor));
@@ -982,9 +983,36 @@ public class BucketCache implements BlockCache, HeapSize {
       BucketEntryGroup bucketMemory =
         new BucketEntryGroup(bytesToFreeWithExtra, blockSize, 
getPartitionSize(memoryFactor));
 
+      // Check the list of files to determine the cold files which can be 
readily evicted.
+      Map<String, String> coldFiles = null;
+      try {
+        DataTieringManager dataTieringManager = 
DataTieringManager.getInstance();
+        coldFiles = dataTieringManager.getColdFilesList();
+      } catch (IllegalStateException e) {
+        LOG.warn("Data Tiering Manager is not set. Ignore time-based block 
evictions.");
+      }
       // Scan entire map putting bucket entry into appropriate bucket entry
       // group
       for (Map.Entry<BlockCacheKey, BucketEntry> bucketEntryWithKey : 
backingMap.entrySet()) {
+        if (
+          coldFiles != null && 
coldFiles.containsKey(bucketEntryWithKey.getKey().getHfileName())
+        ) {
+          int freedBlockSize = bucketEntryWithKey.getValue().getLength();
+          if (evictBlockIfNoRpcReferenced(bucketEntryWithKey.getKey())) {
+            bytesFreed += freedBlockSize;
+          }
+          if (bytesFreed >= bytesToFreeWithExtra) {
+            if (LOG.isDebugEnabled()) {
+              LOG.debug(
+                "Bucket cache free space completed; required: {} freed: {} 
from cold data blocks.",
+                bytesToFreeWithExtra, StringUtils.byteDesc(bytesFreed));
+            }
+            // Sufficient bytes have been freed.
+            return;
+          }
+          continue;
+        }
+
         switch (bucketEntryWithKey.getValue().getPriority()) {
           case SINGLE: {
             bucketSingle.add(bucketEntryWithKey);
@@ -1001,6 +1029,21 @@ public class BucketCache implements BlockCache, HeapSize 
{
         }
       }
 
+      // Check if the cold file eviction is sufficient to create enough space.
+      bytesToFreeWithExtra -= bytesFreed;
+      if (bytesToFreeWithExtra <= 0) {
+        LOG.debug("Bucket cache free space completed; freed space : {} bytes 
of cold data blocks.",
+          StringUtils.byteDesc(bytesFreed));
+        return;
+      }
+
+      if (LOG.isDebugEnabled()) {
+        LOG.debug(
+          "Bucket cache free space completed; freed space : {} "
+            + "bytes of cold data blocks. {} more bytes required to be freed.",
+          StringUtils.byteDesc(bytesFreed), bytesToFreeWithExtra);
+      }
+
       PriorityQueue<BucketEntryGroup> bucketQueue =
         new PriorityQueue<>(3, 
Comparator.comparingLong(BucketEntryGroup::overflow));
 
@@ -1009,8 +1052,6 @@ public class BucketCache implements BlockCache, HeapSize {
       bucketQueue.add(bucketMemory);
 
       int remainingBuckets = bucketQueue.size();
-      long bytesFreed = 0;
-
       BucketEntryGroup bucketGroup;
       while ((bucketGroup = bucketQueue.poll()) != null) {
         long overflow = bucketGroup.overflow();
diff --git 
a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/DataTieringManager.java
 
b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/DataTieringManager.java
index dec96604774..6c699e77c2f 100644
--- 
a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/DataTieringManager.java
+++ 
b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/DataTieringManager.java
@@ -20,6 +20,7 @@ package org.apache.hadoop.hbase.regionserver;
 import static org.apache.hadoop.hbase.regionserver.HStoreFile.TIMERANGE_KEY;
 
 import java.io.IOException;
+import java.util.HashMap;
 import java.util.HashSet;
 import java.util.Map;
 import java.util.OptionalLong;
@@ -173,12 +174,12 @@ public class DataTieringManager {
   private long getMaxTimestamp(Path hFilePath) throws DataTieringException {
     HStoreFile hStoreFile = getHStoreFile(hFilePath);
     if (hStoreFile == null) {
-      LOG.error("HStoreFile corresponding to " + hFilePath + " doesn't exist");
+      LOG.error("HStoreFile corresponding to {} doesn't exist", hFilePath);
       return Long.MAX_VALUE;
     }
     OptionalLong maxTimestamp = hStoreFile.getMaximumTimestamp();
     if (!maxTimestamp.isPresent()) {
-      LOG.error("Maximum timestamp not present for " + hFilePath);
+      LOG.error("Maximum timestamp not present for {}", hFilePath);
       return Long.MAX_VALUE;
     }
     return maxTimestamp.getAsLong();
@@ -270,4 +271,41 @@ public class DataTieringManager {
     return Long.parseLong(
       conf.get(DATATIERING_HOT_DATA_AGE_KEY, 
String.valueOf(DEFAULT_DATATIERING_HOT_DATA_AGE)));
   }
+
+  /*
+   * This API traverses through the list of online regions and returns a 
subset of these files-names
+   * that are cold.
+   * @return List of names of files with cold data as per data-tiering logic.
+   */
+  public Map<String, String> getColdFilesList() {
+    Map<String, String> coldFiles = new HashMap<>();
+    for (HRegion r : this.onlineRegions.values()) {
+      for (HStore hStore : r.getStores()) {
+        Configuration conf = hStore.getReadOnlyConfiguration();
+        if (getDataTieringType(conf) != DataTieringType.TIME_RANGE) {
+          // Data-Tiering not enabled for the store. Just skip it.
+          continue;
+        }
+        Long hotDataAge = getDataTieringHotDataAge(conf);
+
+        for (HStoreFile hStoreFile : hStore.getStorefiles()) {
+          String hFileName =
+            
hStoreFile.getFileInfo().getHFileInfo().getHFileContext().getHFileName();
+          OptionalLong maxTimestamp = hStoreFile.getMaximumTimestamp();
+          if (!maxTimestamp.isPresent()) {
+            LOG.warn("maxTimestamp missing for file: {}",
+              hStoreFile.getFileInfo().getActiveFileName());
+            continue;
+          }
+          long currentTimestamp = 
EnvironmentEdgeManager.getDelegate().currentTime();
+          long fileAge = currentTimestamp - maxTimestamp.getAsLong();
+          if (fileAge > hotDataAge) {
+            // Values do not matter.
+            coldFiles.put(hFileName, null);
+          }
+        }
+      }
+    }
+    return coldFiles;
+  }
 }
diff --git 
a/hbase-server/src/test/java/org/apache/hadoop/hbase/io/hfile/TestPrefetch.java 
b/hbase-server/src/test/java/org/apache/hadoop/hbase/io/hfile/TestPrefetch.java
index 6083d872c82..5c1bf7bb913 100644
--- 
a/hbase-server/src/test/java/org/apache/hadoop/hbase/io/hfile/TestPrefetch.java
+++ 
b/hbase-server/src/test/java/org/apache/hadoop/hbase/io/hfile/TestPrefetch.java
@@ -373,8 +373,8 @@ public class TestPrefetch {
     Thread.sleep(20000);
     assertFalse("Prefetch threads should not be running at this point", 
reader.prefetchStarted());
     while (!reader.prefetchStarted()) {
-      assertTrue("Prefetch delay has not been expired yet",
-        getElapsedTime(startTime) < PrefetchExecutor.getPrefetchDelay());
+      // Wait until the prefetch is triggered.
+      Thread.sleep(500);
     }
     if (reader.prefetchStarted()) {
       // Added some delay as we have started the timer a bit late.
diff --git 
a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestDataTieringManager.java
 
b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestDataTieringManager.java
index 6beed9943b3..f758bb11c5b 100644
--- 
a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestDataTieringManager.java
+++ 
b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestDataTieringManager.java
@@ -18,6 +18,7 @@
 package org.apache.hadoop.hbase.regionserver;
 
 import static org.apache.hadoop.hbase.HConstants.BUCKET_CACHE_SIZE_KEY;
+import static 
org.apache.hadoop.hbase.io.hfile.bucket.BucketCache.DEFAULT_ERROR_TOLERATION_DURATION;
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertTrue;
 import static org.junit.Assert.fail;
@@ -51,7 +52,9 @@ import org.apache.hadoop.hbase.io.hfile.BlockCacheFactory;
 import org.apache.hadoop.hbase.io.hfile.BlockCacheKey;
 import org.apache.hadoop.hbase.io.hfile.BlockType;
 import org.apache.hadoop.hbase.io.hfile.CacheConfig;
+import org.apache.hadoop.hbase.io.hfile.CacheTestUtils;
 import org.apache.hadoop.hbase.io.hfile.HFileContextBuilder;
+import org.apache.hadoop.hbase.io.hfile.bucket.BucketCache;
 import org.apache.hadoop.hbase.testclassification.RegionServerTests;
 import org.apache.hadoop.hbase.testclassification.SmallTests;
 import org.apache.hadoop.hbase.util.Bytes;
@@ -245,6 +248,181 @@ public class TestDataTieringManager {
     }
   }
 
+  @Test
+  public void testPickColdDataFiles() {
+    Map<String, String> coldDataFiles = dataTieringManager.getColdFilesList();
+    assertEquals(1, coldDataFiles.size());
+    // hStoreFiles[3] is the cold file.
+    assert 
(coldDataFiles.containsKey(hStoreFiles.get(3).getFileInfo().getActiveFileName()));
+  }
+
+  /*
+   * Verify that two cold blocks(both) are evicted when bucket reaches its 
capacity. The hot file
+   * remains in the cache.
+   */
+  @Test
+  public void testBlockEvictions() throws Exception {
+    long capacitySize = 40 * 1024;
+    int writeThreads = 3;
+    int writerQLen = 64;
+    int[] bucketSizes = new int[] { 8 * 1024 + 1024 };
+
+    // Setup: Create a bucket cache with lower capacity
+    BucketCache bucketCache = new BucketCache("file:" + testDir + 
"/bucket.cache", capacitySize,
+      8192, bucketSizes, writeThreads, writerQLen, testDir + 
"/bucket.persistence",
+      DEFAULT_ERROR_TOLERATION_DURATION, defaultConf);
+
+    // Create three Cache keys with cold data files and a block with hot data.
+    // hStoreFiles.get(3) is a cold data file, while hStoreFiles.get(0) is a 
hot file.
+    Set<BlockCacheKey> cacheKeys = new HashSet<>();
+    cacheKeys.add(new BlockCacheKey(hStoreFiles.get(3).getPath(), 0, true, 
BlockType.DATA));
+    cacheKeys.add(new BlockCacheKey(hStoreFiles.get(3).getPath(), 8192, true, 
BlockType.DATA));
+    cacheKeys.add(new BlockCacheKey(hStoreFiles.get(0).getPath(), 0, true, 
BlockType.DATA));
+
+    // Create dummy data to be cached and fill the cache completely.
+    CacheTestUtils.HFileBlockPair[] blocks = 
CacheTestUtils.generateHFileBlocks(8192, 3);
+
+    int blocksIter = 0;
+    for (BlockCacheKey key : cacheKeys) {
+      bucketCache.cacheBlock(key, blocks[blocksIter++].getBlock());
+      // Ensure that the block is persisted to the file.
+      Waiter.waitFor(defaultConf, 10000, 100, () -> 
(bucketCache.getBackingMap().containsKey(key)));
+    }
+
+    // Verify that the bucket cache contains 3 blocks.
+    assertEquals(3, bucketCache.getBackingMap().keySet().size());
+
+    // Add an additional block into cache with hot data which should trigger 
the eviction
+    BlockCacheKey newKey = new BlockCacheKey(hStoreFiles.get(2).getPath(), 0, 
true, BlockType.DATA);
+    CacheTestUtils.HFileBlockPair[] newBlock = 
CacheTestUtils.generateHFileBlocks(8192, 1);
+
+    bucketCache.cacheBlock(newKey, newBlock[0].getBlock());
+    Waiter.waitFor(defaultConf, 10000, 100,
+      () -> (bucketCache.getBackingMap().containsKey(newKey)));
+
+    // Verify that the bucket cache now contains 2 hot blocks blocks only.
+    // Both cold blocks of 8KB will be evicted to make room for 1 block of 8KB 
+ an additional
+    // space.
+    validateBlocks(bucketCache.getBackingMap().keySet(), 2, 2, 0);
+  }
+
+  /*
+   * Verify that two cold blocks(both) are evicted when bucket reaches its 
capacity, but one cold
+   * block remains in the cache since the required space is freed.
+   */
+  @Test
+  public void testBlockEvictionsAllColdBlocks() throws Exception {
+    long capacitySize = 40 * 1024;
+    int writeThreads = 3;
+    int writerQLen = 64;
+    int[] bucketSizes = new int[] { 8 * 1024 + 1024 };
+
+    // Setup: Create a bucket cache with lower capacity
+    BucketCache bucketCache = new BucketCache("file:" + testDir + 
"/bucket.cache", capacitySize,
+      8192, bucketSizes, writeThreads, writerQLen, testDir + 
"/bucket.persistence",
+      DEFAULT_ERROR_TOLERATION_DURATION, defaultConf);
+
+    // Create three Cache keys with three cold data blocks.
+    // hStoreFiles.get(3) is a cold data file.
+    Set<BlockCacheKey> cacheKeys = new HashSet<>();
+    cacheKeys.add(new BlockCacheKey(hStoreFiles.get(3).getPath(), 0, true, 
BlockType.DATA));
+    cacheKeys.add(new BlockCacheKey(hStoreFiles.get(3).getPath(), 8192, true, 
BlockType.DATA));
+    cacheKeys.add(new BlockCacheKey(hStoreFiles.get(3).getPath(), 16384, true, 
BlockType.DATA));
+
+    // Create dummy data to be cached and fill the cache completely.
+    CacheTestUtils.HFileBlockPair[] blocks = 
CacheTestUtils.generateHFileBlocks(8192, 3);
+
+    int blocksIter = 0;
+    for (BlockCacheKey key : cacheKeys) {
+      bucketCache.cacheBlock(key, blocks[blocksIter++].getBlock());
+      // Ensure that the block is persisted to the file.
+      Waiter.waitFor(defaultConf, 10000, 100, () -> 
(bucketCache.getBackingMap().containsKey(key)));
+    }
+
+    // Verify that the bucket cache contains 3 blocks.
+    assertEquals(3, bucketCache.getBackingMap().keySet().size());
+
+    // Add an additional block into cache with hot data which should trigger 
the eviction
+    BlockCacheKey newKey = new BlockCacheKey(hStoreFiles.get(2).getPath(), 0, 
true, BlockType.DATA);
+    CacheTestUtils.HFileBlockPair[] newBlock = 
CacheTestUtils.generateHFileBlocks(8192, 1);
+
+    bucketCache.cacheBlock(newKey, newBlock[0].getBlock());
+    Waiter.waitFor(defaultConf, 10000, 100,
+      () -> (bucketCache.getBackingMap().containsKey(newKey)));
+
+    // Verify that the bucket cache now contains 1 cold block and a newly 
added hot block.
+    validateBlocks(bucketCache.getBackingMap().keySet(), 2, 1, 1);
+  }
+
+  /*
+   * Verify that a hot block evicted along with a cold block when bucket 
reaches its capacity.
+   */
+  @Test
+  public void testBlockEvictionsHotBlocks() throws Exception {
+    long capacitySize = 40 * 1024;
+    int writeThreads = 3;
+    int writerQLen = 64;
+    int[] bucketSizes = new int[] { 8 * 1024 + 1024 };
+
+    // Setup: Create a bucket cache with lower capacity
+    BucketCache bucketCache = new BucketCache("file:" + testDir + 
"/bucket.cache", capacitySize,
+      8192, bucketSizes, writeThreads, writerQLen, testDir + 
"/bucket.persistence",
+      DEFAULT_ERROR_TOLERATION_DURATION, defaultConf);
+
+    // Create three Cache keys with two hot data blocks and one cold data block
+    // hStoreFiles.get(0) is a hot data file and hStoreFiles.get(3) is a cold 
data file.
+    Set<BlockCacheKey> cacheKeys = new HashSet<>();
+    cacheKeys.add(new BlockCacheKey(hStoreFiles.get(0).getPath(), 0, true, 
BlockType.DATA));
+    cacheKeys.add(new BlockCacheKey(hStoreFiles.get(0).getPath(), 8192, true, 
BlockType.DATA));
+    cacheKeys.add(new BlockCacheKey(hStoreFiles.get(3).getPath(), 0, true, 
BlockType.DATA));
+
+    // Create dummy data to be cached and fill the cache completely.
+    CacheTestUtils.HFileBlockPair[] blocks = 
CacheTestUtils.generateHFileBlocks(8192, 3);
+
+    int blocksIter = 0;
+    for (BlockCacheKey key : cacheKeys) {
+      bucketCache.cacheBlock(key, blocks[blocksIter++].getBlock());
+      // Ensure that the block is persisted to the file.
+      Waiter.waitFor(defaultConf, 10000, 100, () -> 
(bucketCache.getBackingMap().containsKey(key)));
+    }
+
+    // Verify that the bucket cache contains 3 blocks.
+    assertEquals(3, bucketCache.getBackingMap().keySet().size());
+
+    // Add an additional block which should evict the only cold block with an 
additional hot block.
+    BlockCacheKey newKey = new BlockCacheKey(hStoreFiles.get(2).getPath(), 0, 
true, BlockType.DATA);
+    CacheTestUtils.HFileBlockPair[] newBlock = 
CacheTestUtils.generateHFileBlocks(8192, 1);
+
+    bucketCache.cacheBlock(newKey, newBlock[0].getBlock());
+    Waiter.waitFor(defaultConf, 10000, 100,
+      () -> (bucketCache.getBackingMap().containsKey(newKey)));
+
+    // Verify that the bucket cache now contains 2 hot blocks.
+    // Only one of the older hot blocks is retained and other one is the newly 
added hot block.
+    validateBlocks(bucketCache.getBackingMap().keySet(), 2, 2, 0);
+  }
+
+  private void validateBlocks(Set<BlockCacheKey> keys, int expectedTotalKeys, 
int expectedHotBlocks,
+    int expectedColdBlocks) {
+    int numHotBlocks = 0, numColdBlocks = 0;
+
+    assertEquals(expectedTotalKeys, keys.size());
+    int iter = 0;
+    for (BlockCacheKey key : keys) {
+      try {
+        if (dataTieringManager.isHotData(key)) {
+          numHotBlocks++;
+        } else {
+          numColdBlocks++;
+        }
+      } catch (Exception e) {
+        fail("Unexpected exception!");
+      }
+    }
+    assertEquals(expectedHotBlocks, numHotBlocks);
+    assertEquals(expectedColdBlocks, numColdBlocks);
+  }
+
   private void testDataTieringMethodWithPath(DataTieringMethodCallerWithPath 
caller, Path path,
     boolean expectedResult, DataTieringException exception) {
     try {

Reply via email to