nyl3532016 commented on a change in pull request #3425:
URL: https://github.com/apache/hbase/pull/3425#discussion_r662427040



##########
File path: 
hbase-server/src/main/java/org/apache/hadoop/hbase/compactionserver/CompactionThreadManager.java
##########
@@ -19,41 +18,483 @@
 package org.apache.hadoop.hbase.compactionserver;
 
 import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.Comparator;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Optional;
+import java.util.OptionalLong;
+import java.util.Set;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.ConcurrentMap;
+import java.util.concurrent.RejectedExecutionHandler;
+import java.util.concurrent.ThreadPoolExecutor;
+import java.util.concurrent.TimeUnit;
+import java.util.stream.Collectors;
 
 import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hbase.ChoreService;
 import org.apache.hadoop.hbase.ServerName;
 import org.apache.hadoop.hbase.client.AsyncRegionServerAdmin;
+import org.apache.hadoop.hbase.client.ColumnFamilyDescriptor;
+import org.apache.hadoop.hbase.client.RegionInfo;
+import org.apache.hadoop.hbase.client.TableDescriptor;
+import org.apache.hadoop.hbase.fs.HFileSystem;
+import org.apache.hadoop.hbase.monitoring.MonitoredTask;
+import org.apache.hadoop.hbase.monitoring.TaskMonitor;
+import org.apache.hadoop.hbase.regionserver.HRegion;
+import org.apache.hadoop.hbase.regionserver.HRegionFileSystem;
+import org.apache.hadoop.hbase.regionserver.HStore;
+import org.apache.hadoop.hbase.regionserver.HStoreFile;
+import org.apache.hadoop.hbase.regionserver.compactions.CompactionContext;
+import 
org.apache.hadoop.hbase.regionserver.compactions.CompactionLifeCycleTracker;
+import 
org.apache.hadoop.hbase.regionserver.throttle.PressureAwareCompactionThroughputController;
+import 
org.apache.hadoop.hbase.regionserver.throttle.ThroughputControllerService;
+import org.apache.hadoop.hbase.util.Bytes;
+import org.apache.hadoop.hbase.util.CommonFSUtils;
+import org.apache.hadoop.hbase.util.FSTableDescriptors;
+import org.apache.hadoop.hbase.util.FutureUtils;
+import org.apache.hadoop.hbase.util.Pair;
+import org.apache.hadoop.hbase.util.StealJobQueue;
 import org.apache.yetus.audience.InterfaceAudience;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import 
org.apache.hbase.thirdparty.com.google.common.util.concurrent.ThreadFactoryBuilder;
+
+import org.apache.hadoop.hbase.shaded.protobuf.ProtobufUtil;
+import 
org.apache.hadoop.hbase.shaded.protobuf.generated.CompactionProtos.CompleteCompactionRequest;
+import 
org.apache.hadoop.hbase.shaded.protobuf.generated.CompactionProtos.CompleteCompactionRequest.Builder;
+import 
org.apache.hadoop.hbase.shaded.protobuf.generated.CompactionProtos.CompleteCompactionResponse;
 @InterfaceAudience.Private
-public class CompactionThreadManager {
+public class CompactionThreadManager implements ThroughputControllerService {
   private static Logger LOG = 
LoggerFactory.getLogger(CompactionThreadManager.class);
+  // Configuration key for the large compaction threads.
+  private final static String LARGE_COMPACTION_THREADS =
+      "hbase.compaction.server.thread.compaction.large";
+  private final static int LARGE_COMPACTION_THREADS_DEFAULT = 10;
+  // Configuration key for the small compaction threads.
+  private final static String SMALL_COMPACTION_THREADS =
+      "hbase.compaction.server.thread.compaction.small";
+  private final static int SMALL_COMPACTION_THREADS_DEFAULT = 50;
 
   private final Configuration conf;
   private final ConcurrentMap<ServerName, AsyncRegionServerAdmin> rsAdmins =
       new ConcurrentHashMap<>();
   private final HCompactionServer server;
+  private HFileSystem fs;
+  private Path rootDir;
+  private FSTableDescriptors tableDescriptors;
+  // compaction pools
+  private volatile ThreadPoolExecutor longCompactions;
+  private volatile ThreadPoolExecutor shortCompactions;
+  private ConcurrentHashMap<String, CompactionTask> runningCompactionTasks =
+      new ConcurrentHashMap<>();
+  private PressureAwareCompactionThroughputController throughputController;
+  private CompactionServerStorage storage = new CompactionServerStorage();
 
-  public CompactionThreadManager(final Configuration conf, HCompactionServer 
server) {
+  CompactionThreadManager(final Configuration conf, HCompactionServer server) {
     this.conf = conf;
     this.server = server;
+    try {
+      this.fs = new HFileSystem(this.conf, true);
+      this.rootDir = CommonFSUtils.getRootDir(this.conf);
+      this.tableDescriptors = new FSTableDescriptors(conf);
+      // start compaction resources
+      this.throughputController = new 
PressureAwareCompactionThroughputController();
+      this.throughputController.setConf(conf);
+      this.throughputController.setup(this);
+      startCompactionPool();
+    } catch (Throwable t) {
+      LOG.error("Failed construction CompactionThreadManager", t);
+    }
+  }
+
+  private void startCompactionPool() {
+    final String n = Thread.currentThread().getName();
+    // threads pool used to execute short and long compactions
+    int largeThreads =
+        Math.max(1, conf.getInt(LARGE_COMPACTION_THREADS, 
LARGE_COMPACTION_THREADS_DEFAULT));
+    int smallThreads = conf.getInt(SMALL_COMPACTION_THREADS, 
SMALL_COMPACTION_THREADS_DEFAULT);
+    StealJobQueue<Runnable> stealJobQueue =
+        new StealJobQueue<>(largeThreads, smallThreads, 
COMPACTION_TASK_COMPARATOR);
+    this.longCompactions = new ThreadPoolExecutor(largeThreads, largeThreads, 
60, TimeUnit.SECONDS,
+        stealJobQueue, new ThreadFactoryBuilder().setNameFormat(n + 
"-longCompactions-%d")
+            .setDaemon(true).build());
+    this.longCompactions.setRejectedExecutionHandler(new Rejection());
+    this.longCompactions.prestartAllCoreThreads();
+    this.shortCompactions = new ThreadPoolExecutor(smallThreads, smallThreads, 
60, TimeUnit.SECONDS,
+        stealJobQueue.getStealFromQueue(), new ThreadFactoryBuilder()
+            .setNameFormat(n + 
"-shortCompactions-%d").setDaemon(true).build());
+    this.shortCompactions.setRejectedExecutionHandler(new Rejection());
+  }
+
+  @Override
+  public ChoreService getChoreService() {
+    return server.getChoreService();
+  }
+
+  @Override
+  public double getCompactionPressure() {
+    double max = 0;
+    for (CompactionTask task : getRunningCompactionTasks().values()) {
+      double normCount = task.getStore().getCompactionPressure();
+      if (normCount > max) {
+        max = normCount;
+      }
+    }
+    return max;
+  }
+
+  @Override
+  public double getFlushPressure() {
+    return 0;
+  }
+
+  public void requestCompaction(CompactionTask compactionTask) {
+    try {
+      selectFileAndExecuteTask(compactionTask);
+    } catch (Throwable e) {
+      LOG.error("Failed requestCompaction {}", compactionTask, e);
+    }
+  }
+
+  private void selectFileAndExecuteTask(CompactionTask compactionTask) throws 
IOException {
+    ServerName rsServerName = compactionTask.getRsServerName();
+    RegionInfo regionInfo = compactionTask.getRegionInfo();
+    ColumnFamilyDescriptor cfd = compactionTask.getCfd();
+    String logStr = compactionTask.toString();
+    MonitoredTask status =
+        TaskMonitor.get().createStatus("Compacting region: " + 
regionInfo.getRegionNameAsString()
+            + ", family: " + cfd.getNameAsString() + " from RS: " + 
rsServerName);
+    status.enableStatusJournal(false);
+    // 1. select compaction and check compaction context is present
+    LOG.info("Start select compaction {}", compactionTask);
+    status.setStatus("Start select compaction");
+    Pair<HStore, Optional<CompactionContext>> pair = 
selectCompaction(regionInfo, cfd,
+      compactionTask.isRequestMajor(), compactionTask.getPriority(), status, 
logStr);
+    HStore store = pair.getFirst();
+    Optional<CompactionContext> compaction = pair.getSecond();
+    if (!compaction.isPresent()) {
+      store.close();
+      LOG.info("Compaction context is empty: {}", compactionTask);
+      status.abort("Compaction context is empty and return");
+      return;
+    }
+    CompactionContext compactionContext = compaction.get();
+    // 2. update storage
+    Pair<Boolean, List<String>> updateStoreResult =
+        updateStorageAfterSelectCompaction(regionInfo, cfd, compactionContext, 
status, logStr);
+    if (!updateStoreResult.getFirst()) {
+      store.close();
+      return;
+    }
+    List<String> selectedFileNames = updateStoreResult.getSecond();
+    compactionTask.setHStore(store);
+    compactionTask.setCompactionContext(compactionContext);
+    compactionTask.setSelectedFileNames(selectedFileNames);
+    compactionTask.setMonitoredTask(status);
+    // 3. execute a compaction task
+    ThreadPoolExecutor pool;
+    pool = store.throttleCompaction(compactionContext.getRequest().getSize()) 
? longCompactions
+        : shortCompactions;
+    pool.submit(new CompactionTaskRunner(compactionTask));
+  }
+
+  /**
+   * Open store, and select compaction context
+   * @return Store and CompactionContext
+   */
+  private Pair<HStore, Optional<CompactionContext>> 
selectCompaction(RegionInfo regionInfo,
+    ColumnFamilyDescriptor cfd, boolean major, int priority, MonitoredTask 
status, String logStr)
+      throws IOException {
+    status.setStatus("Open store");
+    tableDescriptors.get(regionInfo.getTable());
+    HStore store = getStore(conf, fs, rootDir, 
tableDescriptors.get(regionInfo.getTable()),
+      regionInfo, cfd.getNameAsString());
+    storage.cleanupCompactedFiles(regionInfo, cfd,

Review comment:
       maybe it is better to change the class name from 
`CompactionServerStorage` to `CompactionFileCache`




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@hbase.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Reply via email to