This is an automated email from the ASF dual-hosted git repository.

wchevreuil pushed a commit to branch HBASE-27389
in repository https://gitbox.apache.org/repos/asf/hbase.git


The following commit(s) were added to refs/heads/HBASE-27389 by this push:
     new 3a2333e6b51 HBASE-27997 Enhance prefetch executor to record region 
prefetch infor… (#5339)
3a2333e6b51 is described below

commit 3a2333e6b51b1a93d75e3a2a7b91ec9b31643362
Author: Rahul Agarkar <ragar...@cloudera.com>
AuthorDate: Wed Aug 2 23:06:30 2023 +0530

    HBASE-27997 Enhance prefetch executor to record region prefetch infor… 
(#5339)
    
    Signed-off-by: Wellington Chevreuil <wchevre...@apache.org>
    Reviewew-by: Kota-SH <shanmukhaharipr...@gmail.com>
---
 .../src/main/protobuf/PrefetchPersistence.proto    |  7 ++-
 .../hadoop/hbase/io/hfile/HFilePreadReader.java    | 12 +++-
 .../hadoop/hbase/io/hfile/PrefetchExecutor.java    | 71 ++++++++++++++++++++--
 .../hadoop/hbase/io/hfile/PrefetchProtoUtils.java  | 26 +++++++-
 .../io/hfile/bucket/TestBucketCachePersister.java  | 34 ++++++++---
 .../io/hfile/bucket/TestPrefetchPersistence.java   | 31 ++++++++--
 6 files changed, 159 insertions(+), 22 deletions(-)

diff --git a/hbase-protocol-shaded/src/main/protobuf/PrefetchPersistence.proto 
b/hbase-protocol-shaded/src/main/protobuf/PrefetchPersistence.proto
index d1a2b4cfd1b..a024b94baa6 100644
--- a/hbase-protocol-shaded/src/main/protobuf/PrefetchPersistence.proto
+++ b/hbase-protocol-shaded/src/main/protobuf/PrefetchPersistence.proto
@@ -27,5 +27,10 @@ option optimize_for = SPEED;
 
 
 message PrefetchedHfileName {
-  map<string, bool> prefetched_files = 1;
+  map<string, RegionFileSizeMap> prefetched_files = 1;
+}
+
+message RegionFileSizeMap {
+  required string region_name = 1;
+  required uint64 region_prefetch_size = 2;
 }
diff --git 
a/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/HFilePreadReader.java
 
b/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/HFilePreadReader.java
index 2c71ce9f484..91fe3066c1e 100644
--- 
a/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/HFilePreadReader.java
+++ 
b/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/HFilePreadReader.java
@@ -77,6 +77,8 @@ public class HFilePreadReader extends HFileReaderImpl {
                 block.release();
               }
             }
+            String regionName = getRegionName(path);
+            PrefetchExecutor.complete(regionName, path, offset);
           } catch (IOException e) {
             // IOExceptions are probably due to region closes (relocation, 
etc.)
             if (LOG.isTraceEnabled()) {
@@ -93,13 +95,21 @@ public class HFilePreadReader extends HFileReaderImpl {
                 LOG.warn("Close prefetch stream reader failed, path: " + path, 
e);
               }
             }
-            PrefetchExecutor.complete(path);
           }
         }
       });
     }
   }
 
+  /*
+   * Get the region name for the given file path. A HFile is always kept under 
the <region>/<column
+   * family>/<hfile>. To find the region for a given hFile, just find the name 
of the grandparent
+   * directory.
+   */
+  private static String getRegionName(Path path) {
+    return path.getParent().getParent().getName();
+  }
+
   private static String getPathOffsetEndStr(final Path path, final long 
offset, final long end) {
     return "path=" + path.toString() + ", offset=" + offset + ", end=" + end;
   }
diff --git 
a/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/PrefetchExecutor.java
 
b/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/PrefetchExecutor.java
index d3064e066a1..3a0629a59c0 100644
--- 
a/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/PrefetchExecutor.java
+++ 
b/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/PrefetchExecutor.java
@@ -17,12 +17,15 @@
  */
 package org.apache.hadoop.hbase.io.hfile;
 
+import com.google.errorprone.annotations.RestrictedApi;
 import java.io.File;
 import java.io.FileInputStream;
 import java.io.FileOutputStream;
 import java.io.IOException;
+import java.util.Collections;
 import java.util.HashMap;
 import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.ConcurrentSkipListMap;
 import java.util.concurrent.Future;
 import java.util.concurrent.RejectedExecutionException;
@@ -38,6 +41,7 @@ import org.apache.hadoop.hbase.HBaseConfiguration;
 import org.apache.hadoop.hbase.HConstants;
 import org.apache.hadoop.hbase.trace.TraceUtil;
 import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
+import org.apache.hadoop.hbase.util.Pair;
 import org.apache.yetus.audience.InterfaceAudience;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -53,7 +57,16 @@ public final class PrefetchExecutor {
   private static final Map<Path, Future<?>> prefetchFutures = new 
ConcurrentSkipListMap<>();
   /** Set of files for which prefetch is completed */
   @edu.umd.cs.findbugs.annotations.SuppressWarnings(value = 
"MS_SHOULD_BE_FINAL")
-  private static HashMap<String, Boolean> prefetchCompleted = new HashMap<>();
+  /**
+   * Map of region -> total size of the region prefetched on this region 
server. This is the total
+   * size of hFiles for this region prefetched on this region server
+   */
+  private static Map<String, Long> regionPrefetchSizeMap = new 
ConcurrentHashMap<>();
+  /**
+   * Map of hFile -> Region -> File size. This map is used by the prefetch 
executor while caching or
+   * evicting individual hFiles.
+   */
+  private static Map<String, Pair<String, Long>> prefetchCompleted = new 
HashMap<>();
   /** Executor pool shared among all HFiles for block prefetch */
   private static final ScheduledExecutorService prefetchExecutorPool;
   /** Delay before beginning prefetch */
@@ -120,9 +133,30 @@ public final class PrefetchExecutor {
     }
   }
 
-  public static void complete(Path path) {
+  private static void removeFileFromPrefetch(String hFileName) {
+    // Update the regionPrefetchedSizeMap before removing the file from 
prefetchCompleted
+    if (prefetchCompleted.containsKey(hFileName)) {
+      Pair<String, Long> regionEntry = prefetchCompleted.get(hFileName);
+      String regionEncodedName = regionEntry.getFirst();
+      long filePrefetchedSize = regionEntry.getSecond();
+      LOG.debug("Removing file {} for region {}", hFileName, 
regionEncodedName);
+      regionPrefetchSizeMap.computeIfPresent(regionEncodedName,
+        (rn, pf) -> pf - filePrefetchedSize);
+      // If all the blocks for a region are evicted from the cache, remove the 
entry for that region
+      if (
+        regionPrefetchSizeMap.containsKey(regionEncodedName)
+          && regionPrefetchSizeMap.get(regionEncodedName) == 0
+      ) {
+        regionPrefetchSizeMap.remove(regionEncodedName);
+      }
+    }
+    prefetchCompleted.remove(hFileName);
+  }
+
+  public static void complete(final String regionName, Path path, long size) {
     prefetchFutures.remove(path);
-    prefetchCompleted.put(path.getName(), true);
+    prefetchCompleted.put(path.getName(), new Pair<>(regionName, size));
+    regionPrefetchSizeMap.merge(regionName, size, (oldpf, fileSize) -> oldpf + 
fileSize);
     LOG.debug("Prefetch completed for {}", path.getName());
   }
 
@@ -173,11 +207,25 @@ public final class PrefetchExecutor {
     try (FileInputStream fis = deleteFileOnClose(prefetchPersistenceFile)) {
       PersistentPrefetchProtos.PrefetchedHfileName proto =
         PersistentPrefetchProtos.PrefetchedHfileName.parseDelimitedFrom(fis);
-      Map<String, Boolean> protoPrefetchedFilesMap = 
proto.getPrefetchedFilesMap();
-      prefetchCompleted.putAll(protoPrefetchedFilesMap);
+      Map<String, PersistentPrefetchProtos.RegionFileSizeMap> 
protoPrefetchedFilesMap =
+        proto.getPrefetchedFilesMap();
+      
prefetchCompleted.putAll(PrefetchProtoUtils.fromPB(protoPrefetchedFilesMap));
+      updateRegionSizeMapWhileRetrievingFromFile();
     }
   }
 
+  private static void updateRegionSizeMapWhileRetrievingFromFile() {
+    // Update the regionPrefetchedSizeMap with the region size while 
restarting the region server
+    LOG.debug("Updating region size map after retrieving prefetch file list");
+    prefetchCompleted.forEach((hFileName, hFileSize) -> {
+      // Get the region name for each file
+      String regionEncodedName = hFileSize.getFirst();
+      long filePrefetchSize = hFileSize.getSecond();
+      regionPrefetchSizeMap.merge(regionEncodedName, filePrefetchSize,
+        (oldpf, fileSize) -> oldpf + fileSize);
+    });
+  }
+
   private static FileInputStream deleteFileOnClose(final File file) throws 
IOException {
     return new FileInputStream(file) {
       private File myFile;
@@ -203,13 +251,24 @@ public final class PrefetchExecutor {
   }
 
   public static void removePrefetchedFileWhileEvict(String hfileName) {
-    prefetchCompleted.remove(hfileName);
+    removeFileFromPrefetch(hfileName);
   }
 
   public static boolean isFilePrefetched(String hfileName) {
     return prefetchCompleted.containsKey(hfileName);
   }
 
+  public static Map<String, Long> getRegionPrefetchInfo() {
+    return Collections.unmodifiableMap(regionPrefetchSizeMap);
+  }
+
+  @RestrictedApi(explanation = "Should only be called in tests", link = "",
+      allowedOnPath = ".*(/src/test/.*|PrefetchExecutor).java")
+  public static void reset() {
+    prefetchCompleted = new HashMap<>();
+    regionPrefetchSizeMap = new ConcurrentHashMap<>();
+  }
+
   private PrefetchExecutor() {
   }
 }
diff --git 
a/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/PrefetchProtoUtils.java
 
b/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/PrefetchProtoUtils.java
index e75e8a6a652..df67e4429a2 100644
--- 
a/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/PrefetchProtoUtils.java
+++ 
b/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/PrefetchProtoUtils.java
@@ -17,7 +17,9 @@
  */
 package org.apache.hadoop.hbase.io.hfile;
 
+import java.util.HashMap;
 import java.util.Map;
+import org.apache.hadoop.hbase.util.Pair;
 
 import 
org.apache.hadoop.hbase.shaded.protobuf.generated.PersistentPrefetchProtos;
 
@@ -26,8 +28,26 @@ final class PrefetchProtoUtils {
   }
 
   static PersistentPrefetchProtos.PrefetchedHfileName
-    toPB(Map<String, Boolean> prefetchedHfileNames) {
-    return PersistentPrefetchProtos.PrefetchedHfileName.newBuilder()
-      .putAllPrefetchedFiles(prefetchedHfileNames).build();
+    toPB(Map<String, Pair<String, Long>> prefetchedHfileNames) {
+    Map<String, PersistentPrefetchProtos.RegionFileSizeMap> tmpMap = new 
HashMap<>();
+    prefetchedHfileNames.forEach((hFileName, regionPrefetchMap) -> {
+      PersistentPrefetchProtos.RegionFileSizeMap tmpRegionFileSize =
+        PersistentPrefetchProtos.RegionFileSizeMap.newBuilder()
+          .setRegionName(regionPrefetchMap.getFirst())
+          .setRegionPrefetchSize(regionPrefetchMap.getSecond()).build();
+      tmpMap.put(hFileName, tmpRegionFileSize);
+    });
+    return 
PersistentPrefetchProtos.PrefetchedHfileName.newBuilder().putAllPrefetchedFiles(tmpMap)
+      .build();
+  }
+
+  static Map<String, Pair<String, Long>>
+    fromPB(Map<String, PersistentPrefetchProtos.RegionFileSizeMap> 
prefetchHFileNames) {
+    Map<String, Pair<String, Long>> hFileMap = new HashMap<>();
+    prefetchHFileNames.forEach((hFileName, regionPrefetchMap) -> {
+      hFileMap.put(hFileName,
+        new Pair<>(regionPrefetchMap.getRegionName(), 
regionPrefetchMap.getRegionPrefetchSize()));
+    });
+    return hFileMap;
   }
 }
diff --git 
a/hbase-server/src/test/java/org/apache/hadoop/hbase/io/hfile/bucket/TestBucketCachePersister.java
 
b/hbase-server/src/test/java/org/apache/hadoop/hbase/io/hfile/bucket/TestBucketCachePersister.java
index dbd3d7f8664..bf44aff1643 100644
--- 
a/hbase-server/src/test/java/org/apache/hadoop/hbase/io/hfile/bucket/TestBucketCachePersister.java
+++ 
b/hbase-server/src/test/java/org/apache/hadoop/hbase/io/hfile/bucket/TestBucketCachePersister.java
@@ -17,6 +17,7 @@
  */
 package org.apache.hadoop.hbase.io.hfile.bucket;
 
+import static 
org.apache.hadoop.hbase.regionserver.HRegionFileSystem.REGION_INFO_FILE;
 import static org.junit.Assert.assertFalse;
 import static org.junit.Assert.assertTrue;
 
@@ -106,8 +107,8 @@ public class TestBucketCachePersister {
     CacheConfig cacheConf = new CacheConfig(conf, bucketCache);
     FileSystem fs = HFileSystem.get(conf);
     // Load Cache
-    Path storeFile = writeStoreFile("TestPrefetch0", conf, cacheConf, fs);
-    Path storeFile2 = writeStoreFile("TestPrefetch1", conf, cacheConf, fs);
+    Path storeFile = writeStoreFile("Region0", "TestPrefetch0", conf, 
cacheConf, fs);
+    Path storeFile2 = writeStoreFile("Region1", "TestPrefetch1", conf, 
cacheConf, fs);
     readStoreFile(storeFile, 0, fs, cacheConf, conf, bucketCache);
     readStoreFile(storeFile2, 0, fs, cacheConf, conf, bucketCache);
     Thread.sleep(bucketCachePersistInterval);
@@ -126,7 +127,7 @@ public class TestBucketCachePersister {
     CacheConfig cacheConf = new CacheConfig(conf, bucketCache);
     FileSystem fs = HFileSystem.get(conf);
     // Load Cache
-    Path storeFile = writeStoreFile("TestPrefetch2", conf, cacheConf, fs);
+    Path storeFile = writeStoreFile("Region2", "TestPrefetch2", conf, 
cacheConf, fs);
     readStoreFile(storeFile, 0, fs, cacheConf, conf, bucketCache);
     assertFalse(new File(testDir + "/prefetch.persistence").exists());
     assertFalse(new File(testDir + "/bucket.persistence").exists());
@@ -140,14 +141,18 @@ public class TestBucketCachePersister {
     CacheConfig cacheConf = new CacheConfig(conf, bucketCache1);
     FileSystem fs = HFileSystem.get(conf);
     // Load Blocks in cache
-    Path storeFile = writeStoreFile("TestPrefetch3", conf, cacheConf, fs);
+    Path storeFile = writeStoreFile("Region3", "TestPrefetch3", conf, 
cacheConf, fs);
     readStoreFile(storeFile, 0, fs, cacheConf, conf, bucketCache1);
     Thread.sleep(500);
     // Evict Blocks from cache
     BlockCacheKey bucketCacheKey = 
bucketCache1.backingMap.entrySet().iterator().next().getKey();
     assertTrue(PrefetchExecutor.isFilePrefetched(storeFile.getName()));
+    int initialRegionPrefetchInfoSize = 
PrefetchExecutor.getRegionPrefetchInfo().size();
+    assertTrue(initialRegionPrefetchInfoSize > 0);
     bucketCache1.evictBlock(bucketCacheKey);
     assertFalse(PrefetchExecutor.isFilePrefetched(storeFile.getName()));
+    int newRegionPrefetchInfoSize = 
PrefetchExecutor.getRegionPrefetchInfo().size();
+    assertTrue(initialRegionPrefetchInfoSize - newRegionPrefetchInfoSize == 1);
   }
 
   public void readStoreFile(Path storeFilePath, long offset, FileSystem fs, 
CacheConfig cacheConf,
@@ -172,9 +177,12 @@ public class TestBucketCachePersister {
     }
   }
 
-  public Path writeStoreFile(String fname, Configuration conf, CacheConfig 
cacheConf, FileSystem fs)
-    throws IOException {
-    Path storeFileParentDir = new Path(TEST_UTIL.getDataTestDir(), fname);
+  public Path writeStoreFile(String regionName, String fname, Configuration 
conf,
+    CacheConfig cacheConf, FileSystem fs) throws IOException {
+    // Create store files as per the following directory structure
+    // <region name>/<column family>/<hFile>
+    Path regionDir = new Path(TEST_UTIL.getDataTestDir(), regionName);
+    Path storeFileParentDir = new Path(regionDir, fname);
     HFileContext meta = new 
HFileContextBuilder().withBlockSize(DATA_BLOCK_SIZE).build();
     StoreFileWriter sfw = new StoreFileWriter.Builder(conf, cacheConf, fs)
       .withOutputDir(storeFileParentDir).withFileContext(meta).build();
@@ -190,6 +198,18 @@ public class TestBucketCachePersister {
     }
 
     sfw.close();
+
+    // Create a dummy .regioninfo file as the PrefetchExecutor needs it to 
find out the region
+    // name to be added to the prefetch file list
+    Path regionInfoFilePath = new Path(regionDir, REGION_INFO_FILE);
+    File regionInfoFile = new File(regionInfoFilePath.toString());
+    try {
+      if (!regionInfoFile.createNewFile()) {
+        assertFalse("Unable to create .regioninfo file", true);
+      }
+    } catch (IOException e) {
+      e.printStackTrace();
+    }
     return sfw.getPath();
   }
 
diff --git 
a/hbase-server/src/test/java/org/apache/hadoop/hbase/io/hfile/bucket/TestPrefetchPersistence.java
 
b/hbase-server/src/test/java/org/apache/hadoop/hbase/io/hfile/bucket/TestPrefetchPersistence.java
index 771ab0158f6..843cf800089 100644
--- 
a/hbase-server/src/test/java/org/apache/hadoop/hbase/io/hfile/bucket/TestPrefetchPersistence.java
+++ 
b/hbase-server/src/test/java/org/apache/hadoop/hbase/io/hfile/bucket/TestPrefetchPersistence.java
@@ -17,6 +17,7 @@
  */
 package org.apache.hadoop.hbase.io.hfile.bucket;
 
+import static 
org.apache.hadoop.hbase.regionserver.HRegionFileSystem.REGION_INFO_FILE;
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertFalse;
 import static org.junit.Assert.assertNotEquals;
@@ -123,14 +124,16 @@ public class TestPrefetchPersistence {
     assertEquals(0, usedSize);
     assertTrue(new File(testDir + "/bucket.cache").exists());
     // Load Cache
-    Path storeFile = writeStoreFile("TestPrefetch0");
-    Path storeFile2 = writeStoreFile("TestPrefetch1");
+    Path storeFile = writeStoreFile("Region0", "TestPrefetch0");
+    Path storeFile2 = writeStoreFile("Region1", "TestPrefetch1");
     readStoreFile(storeFile, 0);
     readStoreFile(storeFile2, 0);
     usedSize = bucketCache.getAllocator().getUsedSize();
     assertNotEquals(0, usedSize);
 
     bucketCache.shutdown();
+    // Reset the info maintained in PrefetchExecutor
+    PrefetchExecutor.reset();
     assertTrue(new File(testDir + "/bucket.persistence").exists());
     assertTrue(new File(testDir + "/prefetch.persistence").exists());
     bucketCache = new BucketCache("file:" + testDir + "/bucket.cache", 
capacitySize,
@@ -149,8 +152,12 @@ public class TestPrefetchPersistence {
   public void closeStoreFile(Path path) throws Exception {
     HFile.Reader reader = HFile.createReader(fs, path, cacheConf, true, conf);
     assertTrue(PrefetchExecutor.isFilePrefetched(path.getName()));
+    int initialRegionPrefetchInfoSize = 
PrefetchExecutor.getRegionPrefetchInfo().size();
+    assertTrue(initialRegionPrefetchInfoSize > 0);
     reader.close(true);
     assertFalse(PrefetchExecutor.isFilePrefetched(path.getName()));
+    int newRegionPrefetchInfoSize = 
PrefetchExecutor.getRegionPrefetchInfo().size();
+    assertTrue(initialRegionPrefetchInfoSize - newRegionPrefetchInfoSize == 1);
   }
 
   public void readStoreFile(Path storeFilePath, long offset) throws Exception {
@@ -174,8 +181,11 @@ public class TestPrefetchPersistence {
     }
   }
 
-  public Path writeStoreFile(String fname) throws IOException {
-    Path storeFileParentDir = new Path(TEST_UTIL.getDataTestDir(), fname);
+  public Path writeStoreFile(String regionName, String fname) throws 
IOException {
+    // Create store files as per the following directory structure
+    // <region name>/<column family>/<hFile>
+    Path regionDir = new Path(TEST_UTIL.getDataTestDir(), regionName);
+    Path storeFileParentDir = new Path(regionDir, fname);
     HFileContext meta = new 
HFileContextBuilder().withBlockSize(DATA_BLOCK_SIZE).build();
     StoreFileWriter sfw = new StoreFileWriter.Builder(conf, cacheConf, fs)
       .withOutputDir(storeFileParentDir).withFileContext(meta).build();
@@ -191,6 +201,19 @@ public class TestPrefetchPersistence {
     }
 
     sfw.close();
+
+    // Create a dummy .regioninfo file as the PrefetchExecutor needs it to 
find out the region name
+    // to be added to the prefetch file list
+    Path regionInfoFilePath = new Path(regionDir, REGION_INFO_FILE);
+    File regionInfoFile = new File(regionInfoFilePath.toString());
+    LOG.info("Create file: {}", regionInfoFilePath);
+    try {
+      if (!regionInfoFile.exists() && !regionInfoFile.createNewFile()) {
+        assertFalse("Unable to create .regioninfo file", true);
+      }
+    } catch (IOException e) {
+      e.printStackTrace();
+    }
     return sfw.getPath();
   }
 

Reply via email to