nyl3532016 commented on a change in pull request #3425:
URL: https://github.com/apache/hbase/pull/3425#discussion_r663465541



##########
File path: 
hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java
##########
@@ -3540,6 +3546,13 @@ public ThroughputController 
getFlushThroughputController() {
     return flushThroughputController;
   }
 
+  /**
+   * @return the flush pressure of all stores on this regionserver. The value 
should be greater than
+   *         or equal to 0.0, and any value greater than 1.0 means we enter 
the emergency state that
+   *         global memstore size already exceeds lower limit.
+   * @deprecated Since 2.0.0

Review comment:
       HBASE-15787 move global memstore size record logic to 
`RegionServerAccounting`, and the `RegionServerAccounting`  has a method 
getFlushPressure(),   they want to run `PressureAwareFlushThroughputController` 
directly on `RegionServerAccounting`, so  deprecated the HRegionserver's 
getFlushPressure() , But I think it is not correct , For 
`PressureAwareFlushThroughputController` need a chore service to setup. Let me 
remove the deprecated java doc

##########
File path: 
hbase-server/src/main/java/org/apache/hadoop/hbase/compactionserver/CompactionThreadManager.java
##########
@@ -19,41 +18,446 @@
 package org.apache.hadoop.hbase.compactionserver;
 
 import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.Comparator;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Optional;
+import java.util.OptionalLong;
+import java.util.Set;
 import java.util.concurrent.ConcurrentHashMap;
-import java.util.concurrent.ConcurrentMap;
+import java.util.concurrent.ThreadPoolExecutor;
+import java.util.function.BiConsumer;
+import java.util.stream.Collectors;
 
 import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hbase.ChoreService;
 import org.apache.hadoop.hbase.ServerName;
 import org.apache.hadoop.hbase.client.AsyncRegionServerAdmin;
+import org.apache.hadoop.hbase.client.ColumnFamilyDescriptor;
+import org.apache.hadoop.hbase.client.RegionInfo;
+import org.apache.hadoop.hbase.client.TableDescriptor;
+import org.apache.hadoop.hbase.fs.HFileSystem;
+import org.apache.hadoop.hbase.monitoring.MonitoredTask;
+import org.apache.hadoop.hbase.monitoring.TaskMonitor;
+import org.apache.hadoop.hbase.regionserver.CompactThreadControl;
+import org.apache.hadoop.hbase.regionserver.HRegion;
+import org.apache.hadoop.hbase.regionserver.HRegionFileSystem;
+import org.apache.hadoop.hbase.regionserver.HStore;
+import org.apache.hadoop.hbase.regionserver.HStoreFile;
+import org.apache.hadoop.hbase.regionserver.compactions.CompactionContext;
+import 
org.apache.hadoop.hbase.regionserver.compactions.CompactionLifeCycleTracker;
+import 
org.apache.hadoop.hbase.regionserver.throttle.ThroughputControllerService;
+import org.apache.hadoop.hbase.util.Bytes;
+import org.apache.hadoop.hbase.util.CommonFSUtils;
+import org.apache.hadoop.hbase.util.FSTableDescriptors;
+import org.apache.hadoop.hbase.util.FutureUtils;
+import org.apache.hadoop.hbase.util.Pair;
+
 import org.apache.yetus.audience.InterfaceAudience;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import org.apache.hadoop.hbase.shaded.protobuf.ProtobufUtil;
+import 
org.apache.hadoop.hbase.shaded.protobuf.generated.CompactionProtos.CompleteCompactionRequest;
+import 
org.apache.hadoop.hbase.shaded.protobuf.generated.CompactionProtos.CompleteCompactionRequest.Builder;
+import 
org.apache.hadoop.hbase.shaded.protobuf.generated.CompactionProtos.CompleteCompactionResponse;
+
+/**
+ * CompactionThreadManager reuse {@link HStore#selectCompaction}, {@link 
HStore#throttleCompaction},
+ * {@link CompactionContext#compact}, {@link CompactThreadControl}, which are 
core logic of
+ * compaction.
+ */
 @InterfaceAudience.Private
-public class CompactionThreadManager {
+public class CompactionThreadManager implements ThroughputControllerService {
   private static Logger LOG = 
LoggerFactory.getLogger(CompactionThreadManager.class);
+  // Configuration key for the large compaction threads.
+  private final static String LARGE_COMPACTION_THREADS =
+      "hbase.compaction.server.thread.compaction.large";
+  private final static int LARGE_COMPACTION_THREADS_DEFAULT = 10;
+  // Configuration key for the small compaction threads.
+  private final static String SMALL_COMPACTION_THREADS =
+      "hbase.compaction.server.thread.compaction.small";
+  private final static int SMALL_COMPACTION_THREADS_DEFAULT = 50;
 
   private final Configuration conf;
-  private final ConcurrentMap<ServerName, AsyncRegionServerAdmin> rsAdmins =
-      new ConcurrentHashMap<>();
   private final HCompactionServer server;
+  private HFileSystem fs;
+  private Path rootDir;
+  private FSTableDescriptors tableDescriptors;
+  private CompactThreadControl compactThreadControl;
+  private ConcurrentHashMap<String, CompactionTask> runningCompactionTasks =
+      new ConcurrentHashMap<>();
+  private static CompactionFilesCache compactionFilesCache = new 
CompactionFilesCache();
 
-  public CompactionThreadManager(final Configuration conf, HCompactionServer 
server) {
+  CompactionThreadManager(final Configuration conf, HCompactionServer server) {
     this.conf = conf;
     this.server = server;
+    try {
+      this.fs = new HFileSystem(this.conf, true);
+      this.rootDir = CommonFSUtils.getRootDir(this.conf);
+      this.tableDescriptors = new FSTableDescriptors(conf);
+      int largeThreads =
+          Math.max(1, conf.getInt(LARGE_COMPACTION_THREADS, 
LARGE_COMPACTION_THREADS_DEFAULT));
+      int smallThreads = conf.getInt(SMALL_COMPACTION_THREADS, 
SMALL_COMPACTION_THREADS_DEFAULT);
+      compactThreadControl = new CompactThreadControl(this, largeThreads, 
smallThreads,
+          COMPACTION_TASK_COMPARATOR, REJECTION);
+    } catch (Throwable t) {
+      LOG.error("Failed construction CompactionThreadManager", t);
+    }
+  }
+
+  @Override
+  public Configuration getConfiguration() {
+    return conf;
+  }
+
+  @Override
+  public ChoreService getChoreService() {
+    return server.getChoreService();
+  }
+
+  @Override
+  public double getCompactionPressure() {
+    double max = 0;
+    for (CompactionTask task : getRunningCompactionTasks().values()) {
+      double normCount = task.getStore().getCompactionPressure();
+      if (normCount > max) {
+        max = normCount;
+      }
+    }
+    return max;
+  }
+
+  @Override
+  public double getFlushPressure() {
+    return 0;
+  }
+
+  public void requestCompaction(CompactionTask compactionTask) {
+    try {
+      selectFileAndExecuteTask(compactionTask);
+    } catch (Throwable e) {
+      LOG.error("Failed requestCompaction {}", compactionTask, e);
+    }
+  }
+
+  private void selectFileAndExecuteTask(CompactionTask compactionTask) throws 
IOException {
+    ServerName rsServerName = compactionTask.getRsServerName();
+    RegionInfo regionInfo = compactionTask.getRegionInfo();
+    ColumnFamilyDescriptor cfd = compactionTask.getCfd();
+    String logStr = compactionTask.toString();
+    MonitoredTask status =
+        TaskMonitor.get().createStatus("Compacting region: " + 
regionInfo.getRegionNameAsString()
+            + ", family: " + cfd.getNameAsString() + " from RS: " + 
rsServerName);
+    status.enableStatusJournal(false);
+    // 1. select compaction and check compaction context is present
+    LOG.info("Start select compaction {}", compactionTask);
+    status.setStatus("Start select compaction");
+    HStore store;
+    CompactionContext compactionContext;
+    Pair<Boolean, List<String>> updateSelectedFilesCacheResult;
+    // the synchronized ensure file in store, selected files in cache, 
compacted files in cache,
+    // the three has consistent state, we need this condition to guarantee 
correct selection
+    synchronized 
(compactionFilesCache.getCompactedStoreFilesAsLock(regionInfo, cfd)) {
+      synchronized 
(compactionFilesCache.getSelectedStoreFilesAsLock(regionInfo, cfd)) {
+        Pair<HStore, Optional<CompactionContext>> pair = 
selectCompaction(regionInfo, cfd,
+          compactionTask.isRequestMajor(), compactionTask.getPriority(), 
status, logStr);
+        store = pair.getFirst();
+        Optional<CompactionContext> compaction = pair.getSecond();
+        if (!compaction.isPresent()) {
+          store.close();
+          LOG.info("Compaction context is empty: {}", compactionTask);
+          status.abort("Compaction context is empty and return");
+          return;
+        }
+        compactionContext = compaction.get();
+        // 2. update compactionFilesCache
+        updateSelectedFilesCacheResult =
+            updateStorageAfterSelectCompaction(regionInfo, cfd, 
compactionContext, status, logStr);
+      } // end of synchronized selected files
+    } // end of synchronized compacted files
+    if (!updateSelectedFilesCacheResult.getFirst()) {
+      store.close();
+      return;
+    }
+    List<String> selectedFileNames = 
updateSelectedFilesCacheResult.getSecond();
+    compactionTask.setHStore(store);
+    compactionTask.setCompactionContext(compactionContext);
+    compactionTask.setSelectedFileNames(selectedFileNames);
+    compactionTask.setMonitoredTask(status);
+    compactionTask.setPriority(compactionContext.getRequest().getPriority());
+    // 3. execute a compaction task
+    ThreadPoolExecutor pool;
+    pool = store.throttleCompaction(compactionContext.getRequest().getSize())
+        ? compactThreadControl.getLongCompactions()
+        : compactThreadControl.getShortCompactions();
+    pool.submit(new CompactionTaskRunner(compactionTask));
+  }
+
+  /**
+   * Open store, and select compaction context
+   * @return Store and CompactionContext
+   */
+  Pair<HStore, Optional<CompactionContext>> selectCompaction(RegionInfo 
regionInfo,
+      ColumnFamilyDescriptor cfd, boolean major, int priority, MonitoredTask 
status, String logStr)
+      throws IOException {
+    status.setStatus("Open store");
+    tableDescriptors.get(regionInfo.getTable());
+    HStore store = getStore(conf, fs, rootDir, 
tableDescriptors.get(regionInfo.getTable()),
+      regionInfo, cfd.getNameAsString());
+
+    // CompactedHFilesDischarger only run on regionserver, so compactionserver 
does not have
+    // opportunity to clean compacted file at that time, we clean compacted 
files here
+    compactionFilesCache.cleanupCompactedFiles(regionInfo, cfd,
+      store.getStorefiles().stream().map(sf -> 
sf.getPath().getName()).collect(Collectors.toSet()));
+    if (major) {
+      status.setStatus("Trigger major compaction");
+      store.triggerMajorCompaction();
+    }
+    // get current compacting and compacted files, NOTE: these files are file 
names only, don't
+    // include paths.
+    status.setStatus("Get current compacting and compacted files from 
compactionFilesCache");
+    Set<String> compactingFiles = 
compactionFilesCache.getSelectedStoreFiles(regionInfo, cfd);
+    Set<String> compactedFiles = 
compactionFilesCache.getCompactedStoreFiles(regionInfo, cfd);
+    Set<String> excludeFiles = new HashSet<>(compactingFiles);
+    excludeFiles.addAll(compactedFiles);
+    // Convert files names to store files
+    status.setStatus("Convert current compacting and compacted files to store 
files");
+    List<HStoreFile> excludeStoreFiles = getExcludedStoreFiles(store, 
excludeFiles);
+    LOG.info(
+      "Start select store: {}, excludeFileNames: {}, excluded: {}, compacting: 
{}, compacted: {}",
+      logStr, excludeFiles.size(), excludeStoreFiles.size(), 
compactingFiles.size(),
+      compactedFiles.size());
+    status.setStatus("Select store files to compaction, major: " + major);
+    Optional<CompactionContext> compaction = store.selectCompaction(priority,
+      CompactionLifeCycleTracker.DUMMY, null, excludeStoreFiles);
+    LOG.info("After select store: {}, if compaction context is present: {}", 
logStr,
+      compaction.isPresent());
+    return new Pair<>(store, compaction);
+  }
+
+  /**
+   * Mark files in compaction context as selected in compactionFilesCache
+   * @return True if success, otherwise if files are already in selected 
compactionFilesCache
+   */
+  private Pair<Boolean, List<String>> 
updateStorageAfterSelectCompaction(RegionInfo regionInfo,
+      ColumnFamilyDescriptor cfd, CompactionContext compactionContext, 
MonitoredTask status,
+      String logStr) {
+    LOG.info("Start update compactionFilesCache after select compaction: {}", 
logStr);
+    // save selected files to compactionFilesCache
+    List<String> selectedFilesNames = new ArrayList<>();
+    for (HStoreFile selectFile : compactionContext.getRequest().getFiles()) {
+      selectedFilesNames.add(selectFile.getFileInfo().getPath().getName());
+    }
+    if (compactionFilesCache.addSelectedFiles(regionInfo, cfd, 
selectedFilesNames)) {
+      LOG.info("Update compactionFilesCache after select compaction success: 
{}", logStr);
+      status.setStatus("Update compactionFilesCache after select compaction 
success");
+      return new Pair<>(Boolean.TRUE, selectedFilesNames);
+    } else {
+      //should not happen
+      LOG.info("selected files are already in store and return: {}", logStr);
+      status.abort("Selected files are already in compactionFilesCache and 
return");
+      return new Pair<>(Boolean.FALSE, Collections.EMPTY_LIST);
+    }
+  }
+
+  /**
+   * Execute compaction in the process of compaction server
+   */
+  private void doCompaction(CompactionTask compactionTask) throws IOException {
+    RegionInfo regionInfo = compactionTask.getRegionInfo();
+    ColumnFamilyDescriptor cfd = compactionTask.getCfd();
+    HStore store = compactionTask.getStore();
+    CompactionContext compactionContext = 
compactionTask.getCompactionContext();
+    List<String> selectedFileNames = compactionTask.getSelectedFileNames();
+    MonitoredTask status = compactionTask.getStatus();
+    try {
+      LOG.info("Start compact store: {}, cf: {}, compaction context: {}", 
store, cfd,
+        compactionContext);
+      List<Path> newFiles =
+          
compactionContext.compact(compactThreadControl.getCompactionThroughputController(),
 null);
+      LOG.info("Finish compact store: {}, cf: {}, new files: {}", store, cfd, 
newFiles);
+      List<String> newFileNames = new ArrayList<>();
+      for (Path newFile : newFiles) {
+        newFileNames.add(newFile.getName());
+      }
+      reportCompactionCompleted(compactionTask, newFileNames, status);
+    } finally {
+      status.setStatus("Remove selected files");
+      LOG.info("Remove selected files: {}", compactionTask);
+      compactionFilesCache.removeSelectedFiles(regionInfo, cfd, 
selectedFileNames);
+    }
   }
 
-  private AsyncRegionServerAdmin getRsAdmin(final ServerName sn) throws 
IOException {
-    AsyncRegionServerAdmin admin = this.rsAdmins.get(sn);
-    if (admin == null) {
-      LOG.debug("New RS admin connection to {}", sn);
-      admin = this.server.getAsyncClusterConnection().getRegionServerAdmin(sn);
-      this.rsAdmins.put(sn, admin);
+  /**
+   * Report compaction completed to RS
+   * @return True if report to RS success, otherwise false
+   */
+  private boolean reportCompactionCompleted(CompactionTask task, List<String> 
newFiles,
+      MonitoredTask status) throws IOException {
+    ServerName rsServerName = task.getRsServerName();
+    RegionInfo regionInfo = task.getRegionInfo();
+    ColumnFamilyDescriptor cfd = task.getCfd();
+    List<String> selectedFileNames = task.getSelectedFileNames();
+    boolean newForceMajor = task.getStore().getForceMajor();
+    Builder builder =
+        
CompleteCompactionRequest.newBuilder().setRegionInfo(ProtobufUtil.toRegionInfo(regionInfo))
+            
.setFamily(ProtobufUtil.toColumnFamilySchema(cfd)).setNewForceMajor(newForceMajor);
+    // use file name only, dose not include path, because the size of protobuf 
is too big
+    for (String selectFile : selectedFileNames) {
+      builder.addSelectedFiles(selectFile);
+    }
+    for (String newFile : newFiles) {
+      builder.addNewFiles(newFile);
+    }
+    CompleteCompactionRequest completeCompactionRequest = builder.build();
+    AsyncRegionServerAdmin rsAdmin = getRsAdmin(rsServerName);
+    try {
+      status
+          .setStatus("Report complete compaction to RS: " + rsServerName + ", 
selected file size: "
+              + selectedFileNames.size() + ", new file size: " + 
newFiles.size());
+      LOG.info("Report complete compaction: {}, selectedFileSize: {}, 
newFileSize: {}", task,
+        completeCompactionRequest.getSelectedFilesList().size(),
+        completeCompactionRequest.getNewFilesList().size());
+      CompleteCompactionResponse completeCompactionResponse =
+          
FutureUtils.get(rsAdmin.completeCompaction(completeCompactionRequest));
+      if (completeCompactionResponse.getSuccess()) {
+        status.markComplete("Report to RS succeeded and RS accepted");
+        // move selected files to compacted files
+        compactionFilesCache.addCompactedFiles(regionInfo, cfd, 
selectedFileNames);
+        LOG.info("Compaction manager request complete compaction success. {}", 
task);
+      } else {
+        //TODO: maybe region is move, we need get latest regionserver name and 
retry
+        status.abort("Report to RS succeeded but RS denied");
+        LOG.warn("Compaction manager request complete compaction fail. {}", 
task);
+      }
+      return true;
+    } catch (IOException e) {
+      //TODO: rpc call broken, add retry
+      status.abort("Report to RS failed");
+      LOG.error("Compaction manager request complete compaction error. {}", 
task, e);
+      return false;
     }
-    return admin;
   }
 
-  public void requestCompaction() {
+  private List<HStoreFile> getExcludedStoreFiles(HStore store, Set<String> 
excludeFileNames) {
+    Collection<HStoreFile> storefiles = store.getStorefiles();
+    List<HStoreFile> storeFiles = new ArrayList<>();
+    for (HStoreFile storefile : storefiles) {
+      String name = storefile.getPath().getName();
+      if (excludeFileNames.contains(name)) {
+        storeFiles.add(storefile);
+      }
+    }
+    return storeFiles;
+  }
+
+  private HStore getStore(final Configuration conf, final FileSystem fs, final 
Path rootDir,
+      final TableDescriptor htd, final RegionInfo hri, final String 
familyName) throws IOException {
+    HRegionFileSystem regionFs = new HRegionFileSystem(conf, fs,
+        CommonFSUtils.getTableDir(rootDir, htd.getTableName()), hri);
+    HRegion region = new HRegion(regionFs, null, conf, htd, null);
+    HStore store = new HStore(region, 
htd.getColumnFamily(Bytes.toBytes(familyName)), conf, false);
+    OptionalLong maxSequenceId = store.getMaxSequenceId();
+    LOG.info("store max sequence id: {}", maxSequenceId.orElse(0));
+    region.getMVCC().advanceTo(maxSequenceId.orElse(0));
+    return store;
+  }
+
+  private AsyncRegionServerAdmin getRsAdmin(final ServerName sn) {
+    return server.getAsyncClusterConnection().getRegionServerAdmin(sn);
+  }
+
+  ConcurrentHashMap<String, CompactionTask> getRunningCompactionTasks() {
+    return runningCompactionTasks;
+  }
+
+  void waitForStop() {
+    compactThreadControl.waitForStop();
+  }
+
+  private void executeCompaction(CompactionTask compactionTask) {
+    try {
+      String taskName = compactionTask.getRsServerName() + "-"
+          + compactionTask.getRegionInfo().getRegionNameAsString() + "-"
+          + compactionTask.getCfd().getNameAsString() + "-" + 
System.currentTimeMillis();
+      compactionTask.setTaskName(taskName);
+      runningCompactionTasks.put(compactionTask.getTaskName(), compactionTask);
+      doCompaction(compactionTask);
+    } catch (Throwable e) {
+      LOG.error("Execute compaction task error: {}", compactionTask, e);

Review comment:
       the Throwable exception most likely throw by `doCompaction`, so add 
catch IOException and retry in `doCompaction` method. The logic here is not 
change?

##########
File path: 
hbase-server/src/main/java/org/apache/hadoop/hbase/compactionserver/CompactionThreadManager.java
##########
@@ -19,41 +18,446 @@
 package org.apache.hadoop.hbase.compactionserver;
 
 import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.Comparator;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Optional;
+import java.util.OptionalLong;
+import java.util.Set;
 import java.util.concurrent.ConcurrentHashMap;
-import java.util.concurrent.ConcurrentMap;
+import java.util.concurrent.ThreadPoolExecutor;
+import java.util.function.BiConsumer;
+import java.util.stream.Collectors;
 
 import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hbase.ChoreService;
 import org.apache.hadoop.hbase.ServerName;
 import org.apache.hadoop.hbase.client.AsyncRegionServerAdmin;
+import org.apache.hadoop.hbase.client.ColumnFamilyDescriptor;
+import org.apache.hadoop.hbase.client.RegionInfo;
+import org.apache.hadoop.hbase.client.TableDescriptor;
+import org.apache.hadoop.hbase.fs.HFileSystem;
+import org.apache.hadoop.hbase.monitoring.MonitoredTask;
+import org.apache.hadoop.hbase.monitoring.TaskMonitor;
+import org.apache.hadoop.hbase.regionserver.CompactThreadControl;
+import org.apache.hadoop.hbase.regionserver.HRegion;
+import org.apache.hadoop.hbase.regionserver.HRegionFileSystem;
+import org.apache.hadoop.hbase.regionserver.HStore;
+import org.apache.hadoop.hbase.regionserver.HStoreFile;
+import org.apache.hadoop.hbase.regionserver.compactions.CompactionContext;
+import 
org.apache.hadoop.hbase.regionserver.compactions.CompactionLifeCycleTracker;
+import 
org.apache.hadoop.hbase.regionserver.throttle.ThroughputControllerService;
+import org.apache.hadoop.hbase.util.Bytes;
+import org.apache.hadoop.hbase.util.CommonFSUtils;
+import org.apache.hadoop.hbase.util.FSTableDescriptors;
+import org.apache.hadoop.hbase.util.FutureUtils;
+import org.apache.hadoop.hbase.util.Pair;
+
 import org.apache.yetus.audience.InterfaceAudience;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import org.apache.hadoop.hbase.shaded.protobuf.ProtobufUtil;
+import 
org.apache.hadoop.hbase.shaded.protobuf.generated.CompactionProtos.CompleteCompactionRequest;
+import 
org.apache.hadoop.hbase.shaded.protobuf.generated.CompactionProtos.CompleteCompactionRequest.Builder;
+import 
org.apache.hadoop.hbase.shaded.protobuf.generated.CompactionProtos.CompleteCompactionResponse;
+
+/**
+ * CompactionThreadManager reuse {@link HStore#selectCompaction}, {@link 
HStore#throttleCompaction},
+ * {@link CompactionContext#compact}, {@link CompactThreadControl}, which are 
core logic of
+ * compaction.
+ */
 @InterfaceAudience.Private
-public class CompactionThreadManager {
+public class CompactionThreadManager implements ThroughputControllerService {
   private static Logger LOG = 
LoggerFactory.getLogger(CompactionThreadManager.class);
+  // Configuration key for the large compaction threads.
+  private final static String LARGE_COMPACTION_THREADS =
+      "hbase.compaction.server.thread.compaction.large";
+  private final static int LARGE_COMPACTION_THREADS_DEFAULT = 10;
+  // Configuration key for the small compaction threads.
+  private final static String SMALL_COMPACTION_THREADS =
+      "hbase.compaction.server.thread.compaction.small";
+  private final static int SMALL_COMPACTION_THREADS_DEFAULT = 50;
 
   private final Configuration conf;
-  private final ConcurrentMap<ServerName, AsyncRegionServerAdmin> rsAdmins =
-      new ConcurrentHashMap<>();
   private final HCompactionServer server;
+  private HFileSystem fs;
+  private Path rootDir;
+  private FSTableDescriptors tableDescriptors;
+  private CompactThreadControl compactThreadControl;
+  private ConcurrentHashMap<String, CompactionTask> runningCompactionTasks =
+      new ConcurrentHashMap<>();
+  private static CompactionFilesCache compactionFilesCache = new 
CompactionFilesCache();
 
-  public CompactionThreadManager(final Configuration conf, HCompactionServer 
server) {
+  CompactionThreadManager(final Configuration conf, HCompactionServer server) {
     this.conf = conf;
     this.server = server;
+    try {
+      this.fs = new HFileSystem(this.conf, true);
+      this.rootDir = CommonFSUtils.getRootDir(this.conf);
+      this.tableDescriptors = new FSTableDescriptors(conf);
+      int largeThreads =
+          Math.max(1, conf.getInt(LARGE_COMPACTION_THREADS, 
LARGE_COMPACTION_THREADS_DEFAULT));
+      int smallThreads = conf.getInt(SMALL_COMPACTION_THREADS, 
SMALL_COMPACTION_THREADS_DEFAULT);
+      compactThreadControl = new CompactThreadControl(this, largeThreads, 
smallThreads,
+          COMPACTION_TASK_COMPARATOR, REJECTION);
+    } catch (Throwable t) {
+      LOG.error("Failed construction CompactionThreadManager", t);
+    }
+  }
+
+  @Override
+  public Configuration getConfiguration() {
+    return conf;
+  }
+
+  @Override
+  public ChoreService getChoreService() {
+    return server.getChoreService();
+  }
+
+  @Override
+  public double getCompactionPressure() {
+    double max = 0;
+    for (CompactionTask task : getRunningCompactionTasks().values()) {
+      double normCount = task.getStore().getCompactionPressure();
+      if (normCount > max) {
+        max = normCount;
+      }
+    }
+    return max;
+  }
+
+  @Override
+  public double getFlushPressure() {
+    return 0;
+  }
+
+  public void requestCompaction(CompactionTask compactionTask) {
+    try {
+      selectFileAndExecuteTask(compactionTask);
+    } catch (Throwable e) {
+      LOG.error("Failed requestCompaction {}", compactionTask, e);
+    }
+  }
+
+  private void selectFileAndExecuteTask(CompactionTask compactionTask) throws 
IOException {
+    ServerName rsServerName = compactionTask.getRsServerName();
+    RegionInfo regionInfo = compactionTask.getRegionInfo();
+    ColumnFamilyDescriptor cfd = compactionTask.getCfd();
+    String logStr = compactionTask.toString();
+    MonitoredTask status =
+        TaskMonitor.get().createStatus("Compacting region: " + 
regionInfo.getRegionNameAsString()
+            + ", family: " + cfd.getNameAsString() + " from RS: " + 
rsServerName);
+    status.enableStatusJournal(false);
+    // 1. select compaction and check compaction context is present
+    LOG.info("Start select compaction {}", compactionTask);
+    status.setStatus("Start select compaction");
+    HStore store;
+    CompactionContext compactionContext;
+    Pair<Boolean, List<String>> updateSelectedFilesCacheResult;
+    // the synchronized ensure file in store, selected files in cache, 
compacted files in cache,
+    // the three has consistent state, we need this condition to guarantee 
correct selection
+    synchronized 
(compactionFilesCache.getCompactedStoreFilesAsLock(regionInfo, cfd)) {
+      synchronized 
(compactionFilesCache.getSelectedStoreFilesAsLock(regionInfo, cfd)) {
+        Pair<HStore, Optional<CompactionContext>> pair = 
selectCompaction(regionInfo, cfd,
+          compactionTask.isRequestMajor(), compactionTask.getPriority(), 
status, logStr);
+        store = pair.getFirst();
+        Optional<CompactionContext> compaction = pair.getSecond();
+        if (!compaction.isPresent()) {
+          store.close();
+          LOG.info("Compaction context is empty: {}", compactionTask);
+          status.abort("Compaction context is empty and return");
+          return;
+        }
+        compactionContext = compaction.get();
+        // 2. update compactionFilesCache
+        updateSelectedFilesCacheResult =
+            updateStorageAfterSelectCompaction(regionInfo, cfd, 
compactionContext, status, logStr);
+      } // end of synchronized selected files
+    } // end of synchronized compacted files
+    if (!updateSelectedFilesCacheResult.getFirst()) {
+      store.close();
+      return;
+    }
+    List<String> selectedFileNames = 
updateSelectedFilesCacheResult.getSecond();
+    compactionTask.setHStore(store);

Review comment:
       In `CSRpcServices#requestCompaction` we set some fields(regioninfo, cf, 
ismajor, priority) and here we calculate other fields(store, selectedFiles, 
compactionContext) and set them

##########
File path: 
hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/CompactThreadControl.java
##########
@@ -0,0 +1,156 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.regionserver;
+
+import java.util.Comparator;
+import java.util.Iterator;
+import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.RejectedExecutionHandler;
+import java.util.concurrent.ThreadPoolExecutor;
+import java.util.concurrent.TimeUnit;
+import java.util.function.BiConsumer;
+import 
org.apache.hadoop.hbase.regionserver.throttle.CompactionThroughputControllerFactory;
+import org.apache.hadoop.hbase.regionserver.throttle.ThroughputController;
+import 
org.apache.hadoop.hbase.regionserver.throttle.ThroughputControllerService;
+import org.apache.hadoop.hbase.util.StealJobQueue;
+import org.apache.yetus.audience.InterfaceAudience;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import org.apache.hbase.thirdparty.com.google.common.base.Preconditions;
+import 
org.apache.hbase.thirdparty.com.google.common.util.concurrent.ThreadFactoryBuilder;
+
+/**
+ * This class help manage compaction thread pools and compaction throughput 
controller,
+ * @see CompactSplit
+ * @see org.apache.hadoop.hbase.compactionserver.CompactionThreadManager
+ */
+@InterfaceAudience.Private
+public class CompactThreadControl {
+  private static final Logger LOG = 
LoggerFactory.getLogger(CompactThreadControl.class);
+  private volatile ThreadPoolExecutor longCompactions;
+  private volatile ThreadPoolExecutor shortCompactions;
+  private volatile ThroughputController compactionThroughputController;
+  private BiConsumer<Runnable, ThreadPoolExecutor> rejection;
+
+  public CompactThreadControl(ThroughputControllerService server, int 
largeThreads,
+      int smallThreads, Comparator<Runnable> cmp,
+      BiConsumer<Runnable, ThreadPoolExecutor> rejection) {
+    createCompactionExecutors(largeThreads, smallThreads, cmp);
+
+    // compaction throughput controller
+    this.compactionThroughputController =
+        CompactionThroughputControllerFactory.create(server, 
server.getConfiguration());
+    // compaction throughput controller
+    this.rejection = rejection;
+  }
+
+  /**
+   * Cleanup class to use when rejecting a compaction request from the queue.
+   */
+  private class Rejection implements RejectedExecutionHandler {
+    @Override
+    public void rejectedExecution(Runnable runnable, ThreadPoolExecutor pool) {
+      rejection.accept(runnable, pool);
+    }
+  }
+
+  void createCompactionExecutors(int largeThreads, int smallThreads, 
Comparator<Runnable> cmp) {
+    // if we have throttle threads, make sure the user also specified size
+    Preconditions.checkArgument(largeThreads > 0 && smallThreads > 0);
+
+    final String n = Thread.currentThread().getName();
+
+    StealJobQueue<Runnable> stealJobQueue = new StealJobQueue<Runnable>(cmp);
+    this.longCompactions = new ThreadPoolExecutor(largeThreads, largeThreads, 
60, TimeUnit.SECONDS,
+        stealJobQueue, new ThreadFactoryBuilder().setNameFormat(n + 
"-longCompactions-%d")
+            .setDaemon(true).build());
+    this.longCompactions.setRejectedExecutionHandler(new Rejection());
+    this.longCompactions.prestartAllCoreThreads();
+    this.shortCompactions = new ThreadPoolExecutor(smallThreads, smallThreads, 
60, TimeUnit.SECONDS,
+        stealJobQueue.getStealFromQueue(), new ThreadFactoryBuilder()
+            .setNameFormat(n + 
"-shortCompactions-%d").setDaemon(true).build());
+    this.shortCompactions.setRejectedExecutionHandler(new Rejection());
+  }
+
+  @Override
+  public String toString() {
+    return "compactionQueue=(longCompactions=" + 
longCompactions.getQueue().size()
+        + ":shortCompactions=" + shortCompactions.getQueue().size() + ")";
+  }
+
+  public StringBuilder dumpQueue() {
+    StringBuilder queueLists = new StringBuilder();
+    queueLists.append("Compaction/Split Queue dump:\n");
+    queueLists.append("  LargeCompation Queue:\n");
+    BlockingQueue<Runnable> lq = longCompactions.getQueue();
+    Iterator<Runnable> it = lq.iterator();
+    while (it.hasNext()) {
+      queueLists.append("    " + it.next().toString());
+      queueLists.append("\n");
+    }
+
+    if (shortCompactions != null) {
+      queueLists.append("\n");
+      queueLists.append("  SmallCompation Queue:\n");
+      lq = shortCompactions.getQueue();
+      it = lq.iterator();
+      while (it.hasNext()) {
+        queueLists.append("    " + it.next().toString());
+        queueLists.append("\n");
+      }
+    }
+    return queueLists;
+  }
+
+  public ThreadPoolExecutor getLongCompactions() {
+    return longCompactions;
+  }
+
+  public ThreadPoolExecutor getShortCompactions() {
+    return shortCompactions;
+  }
+
+  public void
+      setCompactionThroughputController(ThroughputController 
compactionThroughputController) {

Review comment:
       This line break is due to Formatting, the package access is enough

##########
File path: 
hbase-server/src/main/java/org/apache/hadoop/hbase/compactionserver/HCompactionServer.java
##########
@@ -231,6 +247,17 @@ public void run() {
         abort(prefix + t.getMessage(), t);
       }
     }
+    stopChores();
+    if (this.compactionThreadManager != null) {
+      this.compactionThreadManager.waitForStop();
+    }
+

Review comment:
       deleted the line

##########
File path: 
hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/CompactThreadControl.java
##########
@@ -0,0 +1,156 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.regionserver;
+
+import java.util.Comparator;
+import java.util.Iterator;
+import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.RejectedExecutionHandler;
+import java.util.concurrent.ThreadPoolExecutor;
+import java.util.concurrent.TimeUnit;
+import java.util.function.BiConsumer;
+import 
org.apache.hadoop.hbase.regionserver.throttle.CompactionThroughputControllerFactory;
+import org.apache.hadoop.hbase.regionserver.throttle.ThroughputController;
+import 
org.apache.hadoop.hbase.regionserver.throttle.ThroughputControllerService;
+import org.apache.hadoop.hbase.util.StealJobQueue;
+import org.apache.yetus.audience.InterfaceAudience;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import org.apache.hbase.thirdparty.com.google.common.base.Preconditions;
+import 
org.apache.hbase.thirdparty.com.google.common.util.concurrent.ThreadFactoryBuilder;
+
+/**
+ * This class help manage compaction thread pools and compaction throughput 
controller,
+ * @see CompactSplit
+ * @see org.apache.hadoop.hbase.compactionserver.CompactionThreadManager
+ */
+@InterfaceAudience.Private
+public class CompactThreadControl {
+  private static final Logger LOG = 
LoggerFactory.getLogger(CompactThreadControl.class);
+  private volatile ThreadPoolExecutor longCompactions;
+  private volatile ThreadPoolExecutor shortCompactions;
+  private volatile ThroughputController compactionThroughputController;
+  private BiConsumer<Runnable, ThreadPoolExecutor> rejection;
+
+  public CompactThreadControl(ThroughputControllerService server, int 
largeThreads,
+      int smallThreads, Comparator<Runnable> cmp,
+      BiConsumer<Runnable, ThreadPoolExecutor> rejection) {
+    createCompactionExecutors(largeThreads, smallThreads, cmp);
+
+    // compaction throughput controller
+    this.compactionThroughputController =
+        CompactionThroughputControllerFactory.create(server, 
server.getConfiguration());
+    // compaction throughput controller
+    this.rejection = rejection;
+  }
+
+  /**
+   * Cleanup class to use when rejecting a compaction request from the queue.
+   */
+  private class Rejection implements RejectedExecutionHandler {
+    @Override
+    public void rejectedExecution(Runnable runnable, ThreadPoolExecutor pool) {
+      rejection.accept(runnable, pool);
+    }
+  }
+
+  void createCompactionExecutors(int largeThreads, int smallThreads, 
Comparator<Runnable> cmp) {
+    // if we have throttle threads, make sure the user also specified size
+    Preconditions.checkArgument(largeThreads > 0 && smallThreads > 0);
+
+    final String n = Thread.currentThread().getName();
+
+    StealJobQueue<Runnable> stealJobQueue = new StealJobQueue<Runnable>(cmp);
+    this.longCompactions = new ThreadPoolExecutor(largeThreads, largeThreads, 
60, TimeUnit.SECONDS,
+        stealJobQueue, new ThreadFactoryBuilder().setNameFormat(n + 
"-longCompactions-%d")
+            .setDaemon(true).build());
+    this.longCompactions.setRejectedExecutionHandler(new Rejection());
+    this.longCompactions.prestartAllCoreThreads();
+    this.shortCompactions = new ThreadPoolExecutor(smallThreads, smallThreads, 
60, TimeUnit.SECONDS,
+        stealJobQueue.getStealFromQueue(), new ThreadFactoryBuilder()
+            .setNameFormat(n + 
"-shortCompactions-%d").setDaemon(true).build());
+    this.shortCompactions.setRejectedExecutionHandler(new Rejection());
+  }
+
+  @Override
+  public String toString() {
+    return "compactionQueue=(longCompactions=" + 
longCompactions.getQueue().size()
+        + ":shortCompactions=" + shortCompactions.getQueue().size() + ")";
+  }
+
+  public StringBuilder dumpQueue() {
+    StringBuilder queueLists = new StringBuilder();
+    queueLists.append("Compaction/Split Queue dump:\n");
+    queueLists.append("  LargeCompation Queue:\n");
+    BlockingQueue<Runnable> lq = longCompactions.getQueue();
+    Iterator<Runnable> it = lq.iterator();
+    while (it.hasNext()) {
+      queueLists.append("    " + it.next().toString());
+      queueLists.append("\n");
+    }
+
+    if (shortCompactions != null) {
+      queueLists.append("\n");
+      queueLists.append("  SmallCompation Queue:\n");
+      lq = shortCompactions.getQueue();
+      it = lq.iterator();
+      while (it.hasNext()) {
+        queueLists.append("    " + it.next().toString());
+        queueLists.append("\n");
+      }
+    }
+    return queueLists;
+  }
+
+  public ThreadPoolExecutor getLongCompactions() {
+    return longCompactions;
+  }
+
+  public ThreadPoolExecutor getShortCompactions() {
+    return shortCompactions;
+  }
+
+  public void
+      setCompactionThroughputController(ThroughputController 
compactionThroughputController) {
+    this.compactionThroughputController = compactionThroughputController;
+  }
+
+  public ThroughputController getCompactionThroughputController() {
+    return compactionThroughputController;
+  }
+
+  public void waitForStop() {
+    waitForPoolStop(longCompactions, "Large Compaction Thread");
+    waitForPoolStop(shortCompactions, "Small Compaction Thread");
+  }
+
+  private void waitForPoolStop(ThreadPoolExecutor t, String name) {
+    if (t == null) {
+      return;
+    }
+    try {
+      t.shutdown();
+      t.awaitTermination(60, TimeUnit.SECONDS);
+    } catch (InterruptedException ie) {
+      LOG.warn("Interrupted waiting for " + name + " to finish...");

Review comment:
       yes, amend it




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@hbase.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Reply via email to