wchevreuil commented on code in PR #5487:
URL: https://github.com/apache/hbase/pull/5487#discussion_r1378231973
##########
hbase-server/src/main/java/org/apache/hadoop/hbase/master/assignment/MergeTableRegionsProcedure.java:
##########
@@ -573,15 +585,57 @@ private void createMergedRegion(final MasterProcedureEnv
env) throws IOException
final MasterFileSystem mfs = env.getMasterServices().getMasterFileSystem();
final Path tableDir = CommonFSUtils.getTableDir(mfs.getRootDir(),
regionsToMerge[0].getTable());
final FileSystem fs = mfs.getFileSystem();
- List<Path> mergedFiles = new ArrayList<>();
HRegionFileSystem mergeRegionFs = HRegionFileSystem
.createRegionOnFileSystem(env.getMasterConfiguration(), fs, tableDir,
mergedRegion);
+ Configuration conf = env.getMasterConfiguration();
+ int numOfThreads = conf.getInt(HConstants.REGION_SPLIT_THREADS_MAX,
+ conf.getInt(HStore.BLOCKING_STOREFILES_KEY,
HStore.DEFAULT_BLOCKING_STOREFILE_COUNT));
+ List<Path> mergedFiles = new ArrayList<Path>();
+ final ExecutorService threadPool =
Executors.newFixedThreadPool(numOfThreads,
+ new
ThreadFactoryBuilder().setNameFormat("StoreFileMerge-pool-%d").setDaemon(true)
+
.setUncaughtExceptionHandler(Threads.LOGGING_EXCEPTION_HANDLER).build());
+ final List<Future<Path>> futures = new
ArrayList<Future<Path>>();
for (RegionInfo ri : this.regionsToMerge) {
HRegionFileSystem regionFs = HRegionFileSystem
.openRegionFromFileSystem(env.getMasterConfiguration(), fs, tableDir,
ri, false);
- mergedFiles.addAll(mergeStoreFiles(env, regionFs, mergeRegionFs,
mergedRegion));
+ mergeStoreFiles(env, regionFs, mergeRegionFs, mergedRegion, threadPool,
futures);
}
+ // Shutdown the pool
+ threadPool.shutdown();
+
+ // Wait for all the tasks to finish.
+ // When splits ran on the RegionServer, how-long-to-wait-configuration was
named
+ // hbase.regionserver.fileSplitTimeout. If set, use its value.
+ long fileSplitTimeout = conf.getLong("hbase.master.fileSplitTimeout",
+ conf.getLong("hbase.regionserver.fileSplitTimeout", 600000));
+ try {
+ boolean stillRunning = !threadPool.awaitTermination(fileSplitTimeout,
TimeUnit.MILLISECONDS);
+ if (stillRunning) {
+ threadPool.shutdownNow();
+ // wait for the thread to shutdown completely.
+ while (!threadPool.isTerminated()) {
+ Thread.sleep(50);
+ }
+ throw new IOException(
+ "Took too long to merge the" + " files and create the references,
aborting merge");
+ }
+ } catch (InterruptedException e) {
+ throw (InterruptedIOException) new InterruptedIOException().initCause(e);
+ }
+
+ for (Future<Path> future : futures) {
+ try {
+ Path path = future.get();
+ if (path != null) {
+ mergedFiles.add(path);
+ }
+ } catch (InterruptedException e) {
+ throw (InterruptedIOException) new
InterruptedIOException().initCause(e);
+ } catch (ExecutionException e) {
+ throw new IOException(e);
+ }
+ }
Review Comment:
This whole block shows lots of duplication with SplitTableRegionProcedure.
We should put what's reusable in a common method.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]