gvprathyusha6 commented on code in PR #5487:
URL: https://github.com/apache/hbase/pull/5487#discussion_r1437421830
##########
hbase-server/src/main/java/org/apache/hadoop/hbase/master/assignment/MergeTableRegionsProcedure.java:
##########
@@ -573,15 +585,57 @@ private void createMergedRegion(final MasterProcedureEnv
env) throws IOException
final MasterFileSystem mfs = env.getMasterServices().getMasterFileSystem();
final Path tableDir = CommonFSUtils.getTableDir(mfs.getRootDir(),
regionsToMerge[0].getTable());
final FileSystem fs = mfs.getFileSystem();
- List<Path> mergedFiles = new ArrayList<>();
HRegionFileSystem mergeRegionFs = HRegionFileSystem
.createRegionOnFileSystem(env.getMasterConfiguration(), fs, tableDir,
mergedRegion);
+ Configuration conf = env.getMasterConfiguration();
+ int numOfThreads = conf.getInt(HConstants.REGION_SPLIT_THREADS_MAX,
+ conf.getInt(HStore.BLOCKING_STOREFILES_KEY,
HStore.DEFAULT_BLOCKING_STOREFILE_COUNT));
+ List<Path> mergedFiles = new ArrayList<Path>();
+ final ExecutorService threadPool =
Executors.newFixedThreadPool(numOfThreads,
+ new
ThreadFactoryBuilder().setNameFormat("StoreFileMerge-pool-%d").setDaemon(true)
+
.setUncaughtExceptionHandler(Threads.LOGGING_EXCEPTION_HANDLER).build());
+ final List<Future<Path>> futures = new
ArrayList<Future<Path>>();
for (RegionInfo ri : this.regionsToMerge) {
HRegionFileSystem regionFs = HRegionFileSystem
.openRegionFromFileSystem(env.getMasterConfiguration(), fs, tableDir,
ri, false);
- mergedFiles.addAll(mergeStoreFiles(env, regionFs, mergeRegionFs,
mergedRegion));
+ mergeStoreFiles(env, regionFs, mergeRegionFs, mergedRegion, threadPool,
futures);
}
+ // Shutdown the pool
+ threadPool.shutdown();
+
+ // Wait for all the tasks to finish.
+ // When splits ran on the RegionServer, how-long-to-wait-configuration was
named
+ // hbase.regionserver.fileSplitTimeout. If set, use its value.
+ long fileSplitTimeout = conf.getLong("hbase.master.fileSplitTimeout",
+ conf.getLong("hbase.regionserver.fileSplitTimeout", 600000));
+ try {
+ boolean stillRunning = !threadPool.awaitTermination(fileSplitTimeout,
TimeUnit.MILLISECONDS);
+ if (stillRunning) {
+ threadPool.shutdownNow();
+ // wait for the thread to shutdown completely.
+ while (!threadPool.isTerminated()) {
+ Thread.sleep(50);
+ }
+ throw new IOException(
+ "Took too long to merge the" + " files and create the references,
aborting merge");
+ }
+ } catch (InterruptedException e) {
+ throw (InterruptedIOException) new InterruptedIOException().initCause(e);
+ }
+
+ for (Future<Path> future : futures) {
+ try {
+ Path path = future.get();
+ if (path != null) {
+ mergedFiles.add(path);
+ }
+ } catch (InterruptedException e) {
+ throw (InterruptedIOException) new
InterruptedIOException().initCause(e);
+ } catch (ExecutionException e) {
+ throw new IOException(e);
+ }
+ }
Review Comment:
@wchevreuil
Moved common code to a Util file, will it be better to move that to
hbase-common module instead of hbase-server?, there is one
[ThreadUtil](https://github.com/apache/hadoop/blob/trunk/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/util/ThreadUtil.java)
class in hadoop-common, should I be using that instead? thoughts?
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]