This is an automated email from the ASF dual-hosted git repository.

apurtell pushed a commit to branch branch-2
in repository https://gitbox.apache.org/repos/asf/hbase.git


The following commit(s) were added to refs/heads/branch-2 by this push:
     new e7d55666c26 HBASE-28836 Parallize the file archival to improve the 
split times (#6616)
e7d55666c26 is described below

commit e7d55666c267c800365cb9d00cc98a71e2d51a6f
Author: Aman Poonia <[email protected]>
AuthorDate: Wed Feb 19 07:01:35 2025 +0530

    HBASE-28836 Parallize the file archival to improve the split times (#6616)
    
    Signed-off-by: Andrew Purtell <[email protected]>
    Signed-off-by: Peter Somogyi <[email protected]>
---
 .../apache/hadoop/hbase/backup/HFileArchiver.java  | 149 ++++++++++++++-------
 .../master/procedure/DeleteTableProcedure.java     |   7 +-
 .../hbase/regionserver/HRegionFileSystem.java      |   2 +-
 .../hadoop/hbase/backup/TestHFileArchiving.java    |  12 +-
 4 files changed, 112 insertions(+), 58 deletions(-)

diff --git 
a/hbase-server/src/main/java/org/apache/hadoop/hbase/backup/HFileArchiver.java 
b/hbase-server/src/main/java/org/apache/hadoop/hbase/backup/HFileArchiver.java
index b2ea9cd33a0..b7f2f8f70ea 100644
--- 
a/hbase-server/src/main/java/org/apache/hadoop/hbase/backup/HFileArchiver.java
+++ 
b/hbase-server/src/main/java/org/apache/hadoop/hbase/backup/HFileArchiver.java
@@ -23,7 +23,11 @@ import java.io.InterruptedIOException;
 import java.util.ArrayList;
 import java.util.Collection;
 import java.util.Collections;
+import java.util.HashMap;
 import java.util.List;
+import java.util.Map;
+import java.util.Queue;
+import java.util.concurrent.ConcurrentLinkedQueue;
 import java.util.concurrent.ExecutionException;
 import java.util.concurrent.Future;
 import java.util.concurrent.ThreadFactory;
@@ -97,7 +101,7 @@ public class HFileArchiver {
   public static void archiveRegion(Configuration conf, FileSystem fs, 
RegionInfo info)
     throws IOException {
     Path rootDir = CommonFSUtils.getRootDir(conf);
-    archiveRegion(fs, rootDir, CommonFSUtils.getTableDir(rootDir, 
info.getTable()),
+    archiveRegion(conf, fs, rootDir, CommonFSUtils.getTableDir(rootDir, 
info.getTable()),
       FSUtils.getRegionDirFromRootDir(rootDir, info));
   }
 
@@ -113,8 +117,8 @@ public class HFileArchiver {
    *         operations could not complete.
    * @throws IOException if the request cannot be completed
    */
-  public static boolean archiveRegion(FileSystem fs, Path rootdir, Path 
tableDir, Path regionDir)
-    throws IOException {
+  public static boolean archiveRegion(Configuration conf, FileSystem fs, Path 
rootdir,
+    Path tableDir, Path regionDir) throws IOException {
     // otherwise, we archive the files
     // make sure we can archive
     if (tableDir == null || regionDir == null) {
@@ -157,8 +161,8 @@ public class HFileArchiver {
     // convert the files in the region to a File
     Stream.of(storeDirs).map(getAsFile).forEachOrdered(toArchive::add);
     LOG.debug("Archiving " + toArchive);
-    List<File> failedArchive =
-      resolveAndArchive(fs, regionArchiveDir, toArchive, 
EnvironmentEdgeManager.currentTime());
+    List<File> failedArchive = resolveAndArchive(conf, fs, regionArchiveDir, 
toArchive,
+      EnvironmentEdgeManager.currentTime());
     if (!failedArchive.isEmpty()) {
       throw new FailedArchiveException(
         "Failed to archive/delete all the files for region:" + 
regionDir.getName() + " into "
@@ -186,7 +190,7 @@ public class HFileArchiver {
     List<Future<Void>> futures = new ArrayList<>(regionDirList.size());
     for (Path regionDir : regionDirList) {
       Future<Void> future = getArchiveExecutor(conf).submit(() -> {
-        archiveRegion(fs, rootDir, tableDir, regionDir);
+        archiveRegion(conf, fs, rootDir, tableDir, regionDir);
         return null;
       });
       futures.add(future);
@@ -205,8 +209,8 @@ public class HFileArchiver {
   private static synchronized ThreadPoolExecutor getArchiveExecutor(final 
Configuration conf) {
     if (archiveExecutor == null) {
       int maxThreads = conf.getInt("hbase.hfilearchiver.thread.pool.max", 8);
-      archiveExecutor =
-        Threads.getBoundedCachedThreadPool(maxThreads, 30L, TimeUnit.SECONDS, 
getThreadFactory());
+      archiveExecutor = Threads.getBoundedCachedThreadPool(maxThreads, 30L, 
TimeUnit.SECONDS,
+        getThreadFactory("HFileArchiver"));
 
       // Shutdown this ThreadPool in a shutdown hook
       Runtime.getRuntime().addShutdownHook(new Thread(() -> 
archiveExecutor.shutdown()));
@@ -218,13 +222,13 @@ public class HFileArchiver {
   // The difference from Threads.getNamedThreadFactory() is that it doesn't 
fix ThreadGroup for
   // new threads. If we use Threads.getNamedThreadFactory(), we will face 
ThreadGroup related
   // issues in some tests.
-  private static ThreadFactory getThreadFactory() {
+  private static ThreadFactory getThreadFactory(String archiverName) {
     return new ThreadFactory() {
       final AtomicInteger threadNumber = new AtomicInteger(1);
 
       @Override
       public Thread newThread(Runnable r) {
-        final String name = "HFileArchiver-" + threadNumber.getAndIncrement();
+        final String name = archiverName + "-" + 
threadNumber.getAndIncrement();
         Thread t = new Thread(r, name);
         t.setDaemon(true);
         return t;
@@ -273,7 +277,7 @@ public class HFileArchiver {
 
     // do the actual archive
     List<File> failedArchive =
-      resolveAndArchive(fs, storeArchiveDir, toArchive, 
EnvironmentEdgeManager.currentTime());
+      resolveAndArchive(conf, fs, storeArchiveDir, toArchive, 
EnvironmentEdgeManager.currentTime());
     if (!failedArchive.isEmpty()) {
       throw new FailedArchiveException(
         "Failed to archive/delete all the files for region:"
@@ -296,7 +300,7 @@ public class HFileArchiver {
   public static void archiveStoreFiles(Configuration conf, FileSystem fs, 
RegionInfo regionInfo,
     Path tableDir, byte[] family, Collection<HStoreFile> compactedFiles) 
throws IOException {
     Path storeArchiveDir = HFileArchiveUtil.getStoreArchivePath(conf, 
regionInfo, tableDir, family);
-    archive(fs, regionInfo, family, compactedFiles, storeArchiveDir);
+    archive(conf, fs, regionInfo, family, compactedFiles, storeArchiveDir);
   }
 
   /**
@@ -327,11 +331,11 @@ public class HFileArchiver {
         "Wrong file system! Should be " + path.toUri().getScheme() + ", but 
got " + fs.getScheme());
     }
     path = HFileArchiveUtil.getStoreArchivePathForRootDir(path, regionInfo, 
family);
-    archive(fs, regionInfo, family, replayedEdits, path);
+    archive(conf, fs, regionInfo, family, replayedEdits, path);
   }
 
-  private static void archive(FileSystem fs, RegionInfo regionInfo, byte[] 
family,
-    Collection<HStoreFile> compactedFiles, Path storeArchiveDir) throws 
IOException {
+  private static void archive(Configuration conf, FileSystem fs, RegionInfo 
regionInfo,
+    byte[] family, Collection<HStoreFile> compactedFiles, Path 
storeArchiveDir) throws IOException {
     // sometimes in testing, we don't have rss, so we need to check for that
     if (fs == null) {
       LOG.warn(
@@ -365,8 +369,8 @@ public class HFileArchiver {
       compactedFiles.stream().map(getStorePath).collect(Collectors.toList());
 
     // do the actual archive
-    List<File> failedArchive =
-      resolveAndArchive(fs, storeArchiveDir, storeFiles, 
EnvironmentEdgeManager.currentTime());
+    List<File> failedArchive = resolveAndArchive(conf, fs, storeArchiveDir, 
storeFiles,
+      EnvironmentEdgeManager.currentTime());
 
     if (!failedArchive.isEmpty()) {
       throw new FailedArchiveException(
@@ -419,52 +423,97 @@ public class HFileArchiver {
    * @return the list of failed to archive files.
    * @throws IOException if an unexpected file operation exception occurred
    */
-  private static List<File> resolveAndArchive(FileSystem fs, Path 
baseArchiveDir,
-    Collection<File> toArchive, long start) throws IOException {
-    // short circuit if no files to move
+  private static List<File> resolveAndArchive(Configuration conf, FileSystem 
fs,
+    Path baseArchiveDir, Collection<File> toArchive, long start) throws 
IOException {
+    // Early exit if no files to archive
     if (toArchive.isEmpty()) {
+      LOG.trace("No files to archive, returning an empty list.");
       return Collections.emptyList();
     }
 
-    LOG.trace("Moving files to the archive directory {}", baseArchiveDir);
+    LOG.trace("Preparing to archive files into directory: {}", baseArchiveDir);
 
-    // make sure the archive directory exists
-    if (!fs.exists(baseArchiveDir)) {
-      if (!fs.mkdirs(baseArchiveDir)) {
-        throw new IOException("Failed to create the archive directory:" + 
baseArchiveDir
-          + ", quitting archive attempt.");
-      }
-      LOG.trace("Created archive directory {}", baseArchiveDir);
-    }
+    // Ensure the archive directory exists
+    ensureArchiveDirectoryExists(fs, baseArchiveDir);
 
-    List<File> failures = new ArrayList<>();
+    // Thread-safe collection for storing failures
+    Queue<File> failures = new ConcurrentLinkedQueue<>();
     String startTime = Long.toString(start);
+
+    // Separate files and directories for processing
+    List<File> filesOnly = new ArrayList<>();
     for (File file : toArchive) {
-      // if its a file archive it
-      try {
-        LOG.trace("Archiving {}", file);
-        if (file.isFile()) {
-          // attempt to archive the file
-          if (!resolveAndArchiveFile(baseArchiveDir, file, startTime)) {
-            LOG.warn("Couldn't archive " + file + " into backup directory: " + 
baseArchiveDir);
+      if (file.isFile()) {
+        filesOnly.add(file);
+      } else {
+        handleDirectory(conf, fs, baseArchiveDir, failures, file, start);
+      }
+    }
+
+    // Archive files concurrently
+    archiveFilesConcurrently(conf, baseArchiveDir, filesOnly, failures, 
startTime);
+
+    return new ArrayList<>(failures); // Convert to a List for the return value
+  }
+
+  private static void ensureArchiveDirectoryExists(FileSystem fs, Path 
baseArchiveDir)
+    throws IOException {
+    if (!fs.exists(baseArchiveDir) && !fs.mkdirs(baseArchiveDir)) {
+      throw new IOException("Failed to create the archive directory: " + 
baseArchiveDir);
+    }
+    LOG.trace("Archive directory ready: {}", baseArchiveDir);
+  }
+
+  private static void handleDirectory(Configuration conf, FileSystem fs, Path 
baseArchiveDir,
+    Queue<File> failures, File directory, long start) {
+    LOG.trace("Processing directory: {}, archiving its children.", directory);
+    Path subArchiveDir = new Path(baseArchiveDir, directory.getName());
+
+    try {
+      Collection<File> children = directory.getChildren();
+      failures.addAll(resolveAndArchive(conf, fs, subArchiveDir, children, 
start));
+    } catch (IOException e) {
+      LOG.warn("Failed to archive directory: {}", directory, e);
+      failures.add(directory);
+    }
+  }
+
+  private static void archiveFilesConcurrently(Configuration conf, Path 
baseArchiveDir,
+    List<File> files, Queue<File> failures, String startTime) {
+    LOG.trace("Archiving {} files concurrently into directory: {}", 
files.size(), baseArchiveDir);
+    Map<File, Future<Boolean>> futureMap = new HashMap<>();
+    // Submit file archiving tasks
+    // default is 16 which comes equal hbase.hstore.blockingStoreFiles default 
value
+    int maxThreads = 
conf.getInt("hbase.hfilearchiver.per.region.thread.pool.max", 16);
+    ThreadPoolExecutor hfilesArchiveExecutor = 
Threads.getBoundedCachedThreadPool(maxThreads, 30L,
+      TimeUnit.SECONDS, getThreadFactory("HFileArchiverPerRegion-"));
+    try {
+      for (File file : files) {
+        Future<Boolean> future = hfilesArchiveExecutor
+          .submit(() -> resolveAndArchiveFile(baseArchiveDir, file, 
startTime));
+        futureMap.put(file, future);
+      }
+
+      // Process results of each task
+      for (Map.Entry<File, Future<Boolean>> entry : futureMap.entrySet()) {
+        File file = entry.getKey();
+        try {
+          if (!entry.getValue().get()) {
+            LOG.warn("Failed to archive file: {} into directory: {}", file, 
baseArchiveDir);
             failures.add(file);
           }
-        } else {
-          // otherwise its a directory and we need to archive all files
-          LOG.trace("{} is a directory, archiving children files", file);
-          // so we add the directory name to the one base archive
-          Path parentArchiveDir = new Path(baseArchiveDir, file.getName());
-          // and then get all the files from that directory and attempt to
-          // archive those too
-          Collection<File> children = file.getChildren();
-          failures.addAll(resolveAndArchive(fs, parentArchiveDir, children, 
start));
+        } catch (InterruptedException e) {
+          LOG.error("Archiving interrupted for file: {}", file, e);
+          Thread.currentThread().interrupt(); // Restore interrupt status
+          failures.add(file);
+        } catch (ExecutionException e) {
+          LOG.error("Archiving failed for file: {}", file, e);
+          failures.add(file);
         }
-      } catch (IOException e) {
-        LOG.warn("Failed to archive {}", file, e);
-        failures.add(file);
       }
+    } finally {
+      hfilesArchiveExecutor.shutdown();
     }
-    return failures;
   }
 
   /**
diff --git 
a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/DeleteTableProcedure.java
 
b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/DeleteTableProcedure.java
index da6ad90780d..544bb69b79f 100644
--- 
a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/DeleteTableProcedure.java
+++ 
b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/DeleteTableProcedure.java
@@ -20,6 +20,7 @@ package org.apache.hadoop.hbase.master.procedure;
 import java.io.IOException;
 import java.util.ArrayList;
 import java.util.List;
+import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.hbase.MetaTableAccessor;
@@ -289,6 +290,7 @@ public class DeleteTableProcedure extends 
AbstractStateMachineTableProcedure<Del
     final List<RegionInfo> regions, final boolean archive) throws IOException {
     final MasterFileSystem mfs = env.getMasterServices().getMasterFileSystem();
     final FileSystem fs = mfs.getFileSystem();
+    final Configuration conf = env.getMasterConfiguration();
 
     final Path tableDir = CommonFSUtils.getTableDir(mfs.getRootDir(), 
tableName);
 
@@ -307,8 +309,7 @@ public class DeleteTableProcedure extends 
AbstractStateMachineTableProcedure<Del
             }
           }
         }
-        HFileArchiver.archiveRegions(env.getMasterConfiguration(), fs, 
mfs.getRootDir(), tableDir,
-          regionDirList);
+        HFileArchiver.archiveRegions(conf, fs, mfs.getRootDir(), tableDir, 
regionDirList);
         if (!regionDirList.isEmpty()) {
           LOG.debug("Archived {} regions", tableName);
         }
@@ -319,7 +320,7 @@ public class DeleteTableProcedure extends 
AbstractStateMachineTableProcedure<Del
         CommonFSUtils.getTableDir(new Path(mfs.getRootDir(), 
MobConstants.MOB_DIR_NAME), tableName);
       Path regionDir = new Path(mobTableDir, 
MobUtils.getMobRegionInfo(tableName).getEncodedName());
       if (fs.exists(regionDir)) {
-        HFileArchiver.archiveRegion(fs, mfs.getRootDir(), mobTableDir, 
regionDir);
+        HFileArchiver.archiveRegion(conf, fs, mfs.getRootDir(), mobTableDir, 
regionDir);
       }
 
       // Delete table directory from FS
diff --git 
a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionFileSystem.java
 
b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionFileSystem.java
index cc7b447f3de..06c36853b67 100644
--- 
a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionFileSystem.java
+++ 
b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionFileSystem.java
@@ -1003,7 +1003,7 @@ public class HRegionFileSystem {
 
     // Archive region
     Path rootDir = CommonFSUtils.getRootDir(conf);
-    HFileArchiver.archiveRegion(fs, rootDir, tableDir, regionDir);
+    HFileArchiver.archiveRegion(conf, fs, rootDir, tableDir, regionDir);
 
     // Delete empty region dir
     if (!fs.delete(regionDir, true)) {
diff --git 
a/hbase-server/src/test/java/org/apache/hadoop/hbase/backup/TestHFileArchiving.java
 
b/hbase-server/src/test/java/org/apache/hadoop/hbase/backup/TestHFileArchiving.java
index 4c7337e59ec..00cfebbe7b2 100644
--- 
a/hbase-server/src/test/java/org/apache/hadoop/hbase/backup/TestHFileArchiving.java
+++ 
b/hbase-server/src/test/java/org/apache/hadoop/hbase/backup/TestHFileArchiving.java
@@ -651,7 +651,8 @@ public class TestHFileArchiving {
 
         try {
           // Try to archive the file
-          HFileArchiver.archiveRegion(fs, rootDir, 
sourceRegionDir.getParent(), sourceRegionDir);
+          HFileArchiver.archiveRegion(conf, fs, rootDir, 
sourceRegionDir.getParent(),
+            sourceRegionDir);
 
           // The archiver succeded, the file is no longer in the original 
location
           // but it's in the archive location.
@@ -682,12 +683,14 @@ public class TestHFileArchiving {
   public void testArchiveRegionTableAndRegionDirsNull() throws IOException {
     Path rootDir = UTIL.getDataTestDirOnTestFS("testCleaningRace");
     FileSystem fileSystem = UTIL.getTestFileSystem();
+    Configuration conf = 
UTIL.getMiniHBaseCluster().getMaster().getConfiguration();
     // Try to archive the file but with null regionDir, can't delete sourceFile
-    assertFalse(HFileArchiver.archiveRegion(fileSystem, rootDir, null, null));
+    assertFalse(HFileArchiver.archiveRegion(conf, fileSystem, rootDir, null, 
null));
   }
 
   @Test
   public void testArchiveRegionWithTableDirNull() throws IOException {
+    Configuration conf = 
UTIL.getMiniHBaseCluster().getMaster().getConfiguration();
     Path regionDir = new Path(
       CommonFSUtils.getTableDir(new Path("./"), 
TableName.valueOf(name.getMethodName())), "xyzabc");
     Path familyDir = new Path(regionDir, "rd");
@@ -699,12 +702,13 @@ public class TestHFileArchiving {
     Path sourceRegionDir = new Path(rootDir, regionDir);
     fileSystem.mkdirs(sourceRegionDir);
     // Try to archive the file
-    assertFalse(HFileArchiver.archiveRegion(fileSystem, rootDir, null, 
sourceRegionDir));
+    assertFalse(HFileArchiver.archiveRegion(conf, fileSystem, rootDir, null, 
sourceRegionDir));
     assertFalse(fileSystem.exists(sourceRegionDir));
   }
 
   @Test
   public void testArchiveRegionWithRegionDirNull() throws IOException {
+    Configuration conf = 
UTIL.getMiniHBaseCluster().getMaster().getConfiguration();
     Path regionDir =
       new Path(CommonFSUtils.getTableDir(new Path("./"), 
TableName.valueOf(name.getMethodName())),
         "elgn4nf");
@@ -718,7 +722,7 @@ public class TestHFileArchiving {
     fileSystem.mkdirs(sourceRegionDir);
     // Try to archive the file but with null regionDir, can't delete sourceFile
     assertFalse(
-      HFileArchiver.archiveRegion(fileSystem, rootDir, 
sourceRegionDir.getParent(), null));
+      HFileArchiver.archiveRegion(conf, fileSystem, rootDir, 
sourceRegionDir.getParent(), null));
     assertTrue(fileSystem.exists(sourceRegionDir));
     fileSystem.delete(sourceRegionDir, true);
   }

Reply via email to