This is an automated email from the ASF dual-hosted git repository.
apurtell pushed a commit to branch branch-2
in repository https://gitbox.apache.org/repos/asf/hbase.git
The following commit(s) were added to refs/heads/branch-2 by this push:
new e7d55666c26 HBASE-28836 Parallize the file archival to improve the
split times (#6616)
e7d55666c26 is described below
commit e7d55666c267c800365cb9d00cc98a71e2d51a6f
Author: Aman Poonia <[email protected]>
AuthorDate: Wed Feb 19 07:01:35 2025 +0530
HBASE-28836 Parallize the file archival to improve the split times (#6616)
Signed-off-by: Andrew Purtell <[email protected]>
Signed-off-by: Peter Somogyi <[email protected]>
---
.../apache/hadoop/hbase/backup/HFileArchiver.java | 149 ++++++++++++++-------
.../master/procedure/DeleteTableProcedure.java | 7 +-
.../hbase/regionserver/HRegionFileSystem.java | 2 +-
.../hadoop/hbase/backup/TestHFileArchiving.java | 12 +-
4 files changed, 112 insertions(+), 58 deletions(-)
diff --git
a/hbase-server/src/main/java/org/apache/hadoop/hbase/backup/HFileArchiver.java
b/hbase-server/src/main/java/org/apache/hadoop/hbase/backup/HFileArchiver.java
index b2ea9cd33a0..b7f2f8f70ea 100644
---
a/hbase-server/src/main/java/org/apache/hadoop/hbase/backup/HFileArchiver.java
+++
b/hbase-server/src/main/java/org/apache/hadoop/hbase/backup/HFileArchiver.java
@@ -23,7 +23,11 @@ import java.io.InterruptedIOException;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
+import java.util.HashMap;
import java.util.List;
+import java.util.Map;
+import java.util.Queue;
+import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Future;
import java.util.concurrent.ThreadFactory;
@@ -97,7 +101,7 @@ public class HFileArchiver {
public static void archiveRegion(Configuration conf, FileSystem fs,
RegionInfo info)
throws IOException {
Path rootDir = CommonFSUtils.getRootDir(conf);
- archiveRegion(fs, rootDir, CommonFSUtils.getTableDir(rootDir,
info.getTable()),
+ archiveRegion(conf, fs, rootDir, CommonFSUtils.getTableDir(rootDir,
info.getTable()),
FSUtils.getRegionDirFromRootDir(rootDir, info));
}
@@ -113,8 +117,8 @@ public class HFileArchiver {
* operations could not complete.
* @throws IOException if the request cannot be completed
*/
- public static boolean archiveRegion(FileSystem fs, Path rootdir, Path
tableDir, Path regionDir)
- throws IOException {
+ public static boolean archiveRegion(Configuration conf, FileSystem fs, Path
rootdir,
+ Path tableDir, Path regionDir) throws IOException {
// otherwise, we archive the files
// make sure we can archive
if (tableDir == null || regionDir == null) {
@@ -157,8 +161,8 @@ public class HFileArchiver {
// convert the files in the region to a File
Stream.of(storeDirs).map(getAsFile).forEachOrdered(toArchive::add);
LOG.debug("Archiving " + toArchive);
- List<File> failedArchive =
- resolveAndArchive(fs, regionArchiveDir, toArchive,
EnvironmentEdgeManager.currentTime());
+ List<File> failedArchive = resolveAndArchive(conf, fs, regionArchiveDir,
toArchive,
+ EnvironmentEdgeManager.currentTime());
if (!failedArchive.isEmpty()) {
throw new FailedArchiveException(
"Failed to archive/delete all the files for region:" +
regionDir.getName() + " into "
@@ -186,7 +190,7 @@ public class HFileArchiver {
List<Future<Void>> futures = new ArrayList<>(regionDirList.size());
for (Path regionDir : regionDirList) {
Future<Void> future = getArchiveExecutor(conf).submit(() -> {
- archiveRegion(fs, rootDir, tableDir, regionDir);
+ archiveRegion(conf, fs, rootDir, tableDir, regionDir);
return null;
});
futures.add(future);
@@ -205,8 +209,8 @@ public class HFileArchiver {
private static synchronized ThreadPoolExecutor getArchiveExecutor(final
Configuration conf) {
if (archiveExecutor == null) {
int maxThreads = conf.getInt("hbase.hfilearchiver.thread.pool.max", 8);
- archiveExecutor =
- Threads.getBoundedCachedThreadPool(maxThreads, 30L, TimeUnit.SECONDS,
getThreadFactory());
+ archiveExecutor = Threads.getBoundedCachedThreadPool(maxThreads, 30L,
TimeUnit.SECONDS,
+ getThreadFactory("HFileArchiver"));
// Shutdown this ThreadPool in a shutdown hook
Runtime.getRuntime().addShutdownHook(new Thread(() ->
archiveExecutor.shutdown()));
@@ -218,13 +222,13 @@ public class HFileArchiver {
// The difference from Threads.getNamedThreadFactory() is that it doesn't
fix ThreadGroup for
// new threads. If we use Threads.getNamedThreadFactory(), we will face
ThreadGroup related
// issues in some tests.
- private static ThreadFactory getThreadFactory() {
+ private static ThreadFactory getThreadFactory(String archiverName) {
return new ThreadFactory() {
final AtomicInteger threadNumber = new AtomicInteger(1);
@Override
public Thread newThread(Runnable r) {
- final String name = "HFileArchiver-" + threadNumber.getAndIncrement();
+ final String name = archiverName + "-" +
threadNumber.getAndIncrement();
Thread t = new Thread(r, name);
t.setDaemon(true);
return t;
@@ -273,7 +277,7 @@ public class HFileArchiver {
// do the actual archive
List<File> failedArchive =
- resolveAndArchive(fs, storeArchiveDir, toArchive,
EnvironmentEdgeManager.currentTime());
+ resolveAndArchive(conf, fs, storeArchiveDir, toArchive,
EnvironmentEdgeManager.currentTime());
if (!failedArchive.isEmpty()) {
throw new FailedArchiveException(
"Failed to archive/delete all the files for region:"
@@ -296,7 +300,7 @@ public class HFileArchiver {
public static void archiveStoreFiles(Configuration conf, FileSystem fs,
RegionInfo regionInfo,
Path tableDir, byte[] family, Collection<HStoreFile> compactedFiles)
throws IOException {
Path storeArchiveDir = HFileArchiveUtil.getStoreArchivePath(conf,
regionInfo, tableDir, family);
- archive(fs, regionInfo, family, compactedFiles, storeArchiveDir);
+ archive(conf, fs, regionInfo, family, compactedFiles, storeArchiveDir);
}
/**
@@ -327,11 +331,11 @@ public class HFileArchiver {
"Wrong file system! Should be " + path.toUri().getScheme() + ", but
got " + fs.getScheme());
}
path = HFileArchiveUtil.getStoreArchivePathForRootDir(path, regionInfo,
family);
- archive(fs, regionInfo, family, replayedEdits, path);
+ archive(conf, fs, regionInfo, family, replayedEdits, path);
}
- private static void archive(FileSystem fs, RegionInfo regionInfo, byte[]
family,
- Collection<HStoreFile> compactedFiles, Path storeArchiveDir) throws
IOException {
+ private static void archive(Configuration conf, FileSystem fs, RegionInfo
regionInfo,
+ byte[] family, Collection<HStoreFile> compactedFiles, Path
storeArchiveDir) throws IOException {
// sometimes in testing, we don't have rss, so we need to check for that
if (fs == null) {
LOG.warn(
@@ -365,8 +369,8 @@ public class HFileArchiver {
compactedFiles.stream().map(getStorePath).collect(Collectors.toList());
// do the actual archive
- List<File> failedArchive =
- resolveAndArchive(fs, storeArchiveDir, storeFiles,
EnvironmentEdgeManager.currentTime());
+ List<File> failedArchive = resolveAndArchive(conf, fs, storeArchiveDir,
storeFiles,
+ EnvironmentEdgeManager.currentTime());
if (!failedArchive.isEmpty()) {
throw new FailedArchiveException(
@@ -419,52 +423,97 @@ public class HFileArchiver {
* @return the list of failed to archive files.
* @throws IOException if an unexpected file operation exception occurred
*/
- private static List<File> resolveAndArchive(FileSystem fs, Path
baseArchiveDir,
- Collection<File> toArchive, long start) throws IOException {
- // short circuit if no files to move
+ private static List<File> resolveAndArchive(Configuration conf, FileSystem
fs,
+ Path baseArchiveDir, Collection<File> toArchive, long start) throws
IOException {
+ // Early exit if no files to archive
if (toArchive.isEmpty()) {
+ LOG.trace("No files to archive, returning an empty list.");
return Collections.emptyList();
}
- LOG.trace("Moving files to the archive directory {}", baseArchiveDir);
+ LOG.trace("Preparing to archive files into directory: {}", baseArchiveDir);
- // make sure the archive directory exists
- if (!fs.exists(baseArchiveDir)) {
- if (!fs.mkdirs(baseArchiveDir)) {
- throw new IOException("Failed to create the archive directory:" +
baseArchiveDir
- + ", quitting archive attempt.");
- }
- LOG.trace("Created archive directory {}", baseArchiveDir);
- }
+ // Ensure the archive directory exists
+ ensureArchiveDirectoryExists(fs, baseArchiveDir);
- List<File> failures = new ArrayList<>();
+ // Thread-safe collection for storing failures
+ Queue<File> failures = new ConcurrentLinkedQueue<>();
String startTime = Long.toString(start);
+
+ // Separate files and directories for processing
+ List<File> filesOnly = new ArrayList<>();
for (File file : toArchive) {
- // if its a file archive it
- try {
- LOG.trace("Archiving {}", file);
- if (file.isFile()) {
- // attempt to archive the file
- if (!resolveAndArchiveFile(baseArchiveDir, file, startTime)) {
- LOG.warn("Couldn't archive " + file + " into backup directory: " +
baseArchiveDir);
+ if (file.isFile()) {
+ filesOnly.add(file);
+ } else {
+ handleDirectory(conf, fs, baseArchiveDir, failures, file, start);
+ }
+ }
+
+ // Archive files concurrently
+ archiveFilesConcurrently(conf, baseArchiveDir, filesOnly, failures,
startTime);
+
+ return new ArrayList<>(failures); // Convert to a List for the return value
+ }
+
+ private static void ensureArchiveDirectoryExists(FileSystem fs, Path
baseArchiveDir)
+ throws IOException {
+ if (!fs.exists(baseArchiveDir) && !fs.mkdirs(baseArchiveDir)) {
+ throw new IOException("Failed to create the archive directory: " +
baseArchiveDir);
+ }
+ LOG.trace("Archive directory ready: {}", baseArchiveDir);
+ }
+
+ private static void handleDirectory(Configuration conf, FileSystem fs, Path
baseArchiveDir,
+ Queue<File> failures, File directory, long start) {
+ LOG.trace("Processing directory: {}, archiving its children.", directory);
+ Path subArchiveDir = new Path(baseArchiveDir, directory.getName());
+
+ try {
+ Collection<File> children = directory.getChildren();
+ failures.addAll(resolveAndArchive(conf, fs, subArchiveDir, children,
start));
+ } catch (IOException e) {
+ LOG.warn("Failed to archive directory: {}", directory, e);
+ failures.add(directory);
+ }
+ }
+
+ private static void archiveFilesConcurrently(Configuration conf, Path
baseArchiveDir,
+ List<File> files, Queue<File> failures, String startTime) {
+ LOG.trace("Archiving {} files concurrently into directory: {}",
files.size(), baseArchiveDir);
+ Map<File, Future<Boolean>> futureMap = new HashMap<>();
+ // Submit file archiving tasks
+ // default is 16 which comes equal hbase.hstore.blockingStoreFiles default
value
+ int maxThreads =
conf.getInt("hbase.hfilearchiver.per.region.thread.pool.max", 16);
+ ThreadPoolExecutor hfilesArchiveExecutor =
Threads.getBoundedCachedThreadPool(maxThreads, 30L,
+ TimeUnit.SECONDS, getThreadFactory("HFileArchiverPerRegion-"));
+ try {
+ for (File file : files) {
+ Future<Boolean> future = hfilesArchiveExecutor
+ .submit(() -> resolveAndArchiveFile(baseArchiveDir, file,
startTime));
+ futureMap.put(file, future);
+ }
+
+ // Process results of each task
+ for (Map.Entry<File, Future<Boolean>> entry : futureMap.entrySet()) {
+ File file = entry.getKey();
+ try {
+ if (!entry.getValue().get()) {
+ LOG.warn("Failed to archive file: {} into directory: {}", file,
baseArchiveDir);
failures.add(file);
}
- } else {
- // otherwise its a directory and we need to archive all files
- LOG.trace("{} is a directory, archiving children files", file);
- // so we add the directory name to the one base archive
- Path parentArchiveDir = new Path(baseArchiveDir, file.getName());
- // and then get all the files from that directory and attempt to
- // archive those too
- Collection<File> children = file.getChildren();
- failures.addAll(resolveAndArchive(fs, parentArchiveDir, children,
start));
+ } catch (InterruptedException e) {
+ LOG.error("Archiving interrupted for file: {}", file, e);
+ Thread.currentThread().interrupt(); // Restore interrupt status
+ failures.add(file);
+ } catch (ExecutionException e) {
+ LOG.error("Archiving failed for file: {}", file, e);
+ failures.add(file);
}
- } catch (IOException e) {
- LOG.warn("Failed to archive {}", file, e);
- failures.add(file);
}
+ } finally {
+ hfilesArchiveExecutor.shutdown();
}
- return failures;
}
/**
diff --git
a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/DeleteTableProcedure.java
b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/DeleteTableProcedure.java
index da6ad90780d..544bb69b79f 100644
---
a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/DeleteTableProcedure.java
+++
b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/DeleteTableProcedure.java
@@ -20,6 +20,7 @@ package org.apache.hadoop.hbase.master.procedure;
import java.io.IOException;
import java.util.ArrayList;
import java.util.List;
+import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hbase.MetaTableAccessor;
@@ -289,6 +290,7 @@ public class DeleteTableProcedure extends
AbstractStateMachineTableProcedure<Del
final List<RegionInfo> regions, final boolean archive) throws IOException {
final MasterFileSystem mfs = env.getMasterServices().getMasterFileSystem();
final FileSystem fs = mfs.getFileSystem();
+ final Configuration conf = env.getMasterConfiguration();
final Path tableDir = CommonFSUtils.getTableDir(mfs.getRootDir(),
tableName);
@@ -307,8 +309,7 @@ public class DeleteTableProcedure extends
AbstractStateMachineTableProcedure<Del
}
}
}
- HFileArchiver.archiveRegions(env.getMasterConfiguration(), fs,
mfs.getRootDir(), tableDir,
- regionDirList);
+ HFileArchiver.archiveRegions(conf, fs, mfs.getRootDir(), tableDir,
regionDirList);
if (!regionDirList.isEmpty()) {
LOG.debug("Archived {} regions", tableName);
}
@@ -319,7 +320,7 @@ public class DeleteTableProcedure extends
AbstractStateMachineTableProcedure<Del
CommonFSUtils.getTableDir(new Path(mfs.getRootDir(),
MobConstants.MOB_DIR_NAME), tableName);
Path regionDir = new Path(mobTableDir,
MobUtils.getMobRegionInfo(tableName).getEncodedName());
if (fs.exists(regionDir)) {
- HFileArchiver.archiveRegion(fs, mfs.getRootDir(), mobTableDir,
regionDir);
+ HFileArchiver.archiveRegion(conf, fs, mfs.getRootDir(), mobTableDir,
regionDir);
}
// Delete table directory from FS
diff --git
a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionFileSystem.java
b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionFileSystem.java
index cc7b447f3de..06c36853b67 100644
---
a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionFileSystem.java
+++
b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionFileSystem.java
@@ -1003,7 +1003,7 @@ public class HRegionFileSystem {
// Archive region
Path rootDir = CommonFSUtils.getRootDir(conf);
- HFileArchiver.archiveRegion(fs, rootDir, tableDir, regionDir);
+ HFileArchiver.archiveRegion(conf, fs, rootDir, tableDir, regionDir);
// Delete empty region dir
if (!fs.delete(regionDir, true)) {
diff --git
a/hbase-server/src/test/java/org/apache/hadoop/hbase/backup/TestHFileArchiving.java
b/hbase-server/src/test/java/org/apache/hadoop/hbase/backup/TestHFileArchiving.java
index 4c7337e59ec..00cfebbe7b2 100644
---
a/hbase-server/src/test/java/org/apache/hadoop/hbase/backup/TestHFileArchiving.java
+++
b/hbase-server/src/test/java/org/apache/hadoop/hbase/backup/TestHFileArchiving.java
@@ -651,7 +651,8 @@ public class TestHFileArchiving {
try {
// Try to archive the file
- HFileArchiver.archiveRegion(fs, rootDir,
sourceRegionDir.getParent(), sourceRegionDir);
+ HFileArchiver.archiveRegion(conf, fs, rootDir,
sourceRegionDir.getParent(),
+ sourceRegionDir);
// The archiver succeded, the file is no longer in the original
location
// but it's in the archive location.
@@ -682,12 +683,14 @@ public class TestHFileArchiving {
public void testArchiveRegionTableAndRegionDirsNull() throws IOException {
Path rootDir = UTIL.getDataTestDirOnTestFS("testCleaningRace");
FileSystem fileSystem = UTIL.getTestFileSystem();
+ Configuration conf =
UTIL.getMiniHBaseCluster().getMaster().getConfiguration();
// Try to archive the file but with null regionDir, can't delete sourceFile
- assertFalse(HFileArchiver.archiveRegion(fileSystem, rootDir, null, null));
+ assertFalse(HFileArchiver.archiveRegion(conf, fileSystem, rootDir, null,
null));
}
@Test
public void testArchiveRegionWithTableDirNull() throws IOException {
+ Configuration conf =
UTIL.getMiniHBaseCluster().getMaster().getConfiguration();
Path regionDir = new Path(
CommonFSUtils.getTableDir(new Path("./"),
TableName.valueOf(name.getMethodName())), "xyzabc");
Path familyDir = new Path(regionDir, "rd");
@@ -699,12 +702,13 @@ public class TestHFileArchiving {
Path sourceRegionDir = new Path(rootDir, regionDir);
fileSystem.mkdirs(sourceRegionDir);
// Try to archive the file
- assertFalse(HFileArchiver.archiveRegion(fileSystem, rootDir, null,
sourceRegionDir));
+ assertFalse(HFileArchiver.archiveRegion(conf, fileSystem, rootDir, null,
sourceRegionDir));
assertFalse(fileSystem.exists(sourceRegionDir));
}
@Test
public void testArchiveRegionWithRegionDirNull() throws IOException {
+ Configuration conf =
UTIL.getMiniHBaseCluster().getMaster().getConfiguration();
Path regionDir =
new Path(CommonFSUtils.getTableDir(new Path("./"),
TableName.valueOf(name.getMethodName())),
"elgn4nf");
@@ -718,7 +722,7 @@ public class TestHFileArchiving {
fileSystem.mkdirs(sourceRegionDir);
// Try to archive the file but with null regionDir, can't delete sourceFile
assertFalse(
- HFileArchiver.archiveRegion(fileSystem, rootDir,
sourceRegionDir.getParent(), null));
+ HFileArchiver.archiveRegion(conf, fileSystem, rootDir,
sourceRegionDir.getParent(), null));
assertTrue(fileSystem.exists(sourceRegionDir));
fileSystem.delete(sourceRegionDir, true);
}