Repository: hbase
Updated Branches:
  refs/heads/branch-2 b2b5cd6de -> 89d2adfe9


HBASE-18083 Make large/small file clean thread number configurable in 
HFileCleaner


Project: http://git-wip-us.apache.org/repos/asf/hbase/repo
Commit: http://git-wip-us.apache.org/repos/asf/hbase/commit/89d2adfe
Tree: http://git-wip-us.apache.org/repos/asf/hbase/tree/89d2adfe
Diff: http://git-wip-us.apache.org/repos/asf/hbase/diff/89d2adfe

Branch: refs/heads/branch-2
Commit: 89d2adfe92b2e775a0a3761257846e2d4ed7d141
Parents: b2b5cd6
Author: Yu Li <l...@apache.org>
Authored: Fri Jul 7 14:07:23 2017 +0800
Committer: Yu Li <l...@apache.org>
Committed: Fri Jul 7 15:39:34 2017 +0800

----------------------------------------------------------------------
 .../hbase/master/cleaner/HFileCleaner.java      | 154 +++++++++++++------
 .../hbase/master/cleaner/TestHFileCleaner.java  |  13 +-
 2 files changed, 120 insertions(+), 47 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hbase/blob/89d2adfe/hbase-server/src/main/java/org/apache/hadoop/hbase/master/cleaner/HFileCleaner.java
----------------------------------------------------------------------
diff --git 
a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/cleaner/HFileCleaner.java
 
b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/cleaner/HFileCleaner.java
index 8b3515a..6d1fbe3 100644
--- 
a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/cleaner/HFileCleaner.java
+++ 
b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/cleaner/HFileCleaner.java
@@ -22,6 +22,7 @@ import java.util.ArrayList;
 import java.util.List;
 import java.util.Map;
 import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.atomic.AtomicLong;
 
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
@@ -67,6 +68,16 @@ public class HFileCleaner extends 
CleanerChore<BaseHFileCleanerDelegate> impleme
       "hbase.regionserver.hfilecleaner.small.queue.size";
   public final static int DEFAULT_SMALL_HFILE_QUEUE_INIT_SIZE = 10240;
 
+  // Configuration key for large file delete thread number
+  public final static String LARGE_HFILE_DELETE_THREAD_NUMBER =
+      "hbase.regionserver.hfilecleaner.large.thread.count";
+  public final static int DEFAULT_LARGE_HFILE_DELETE_THREAD_NUMBER = 1;
+
+  // Configuration key for small file delete thread number
+  public final static String SMALL_HFILE_DELETE_THREAD_NUMBER =
+      "hbase.regionserver.hfilecleaner.small.thread.count";
+  public final static int DEFAULT_SMALL_HFILE_DELETE_THREAD_NUMBER = 1;
+
   private static final Log LOG = LogFactory.getLog(HFileCleaner.class);
 
   StealJobQueue<HFileDeleteTask> largeFileQueue;
@@ -74,11 +85,13 @@ public class HFileCleaner extends 
CleanerChore<BaseHFileCleanerDelegate> impleme
   private int throttlePoint;
   private int largeQueueInitSize;
   private int smallQueueInitSize;
+  private int largeFileDeleteThreadNumber;
+  private int smallFileDeleteThreadNumber;
   private List<Thread> threads = new ArrayList<Thread>();
   private boolean running;
 
-  private long deletedLargeFiles = 0L;
-  private long deletedSmallFiles = 0L;
+  private AtomicLong deletedLargeFiles = new AtomicLong();
+  private AtomicLong deletedSmallFiles = new AtomicLong();
 
   /**
    * @param period the period of time to sleep between each run
@@ -100,6 +113,10 @@ public class HFileCleaner extends 
CleanerChore<BaseHFileCleanerDelegate> impleme
         conf.getInt(SMALL_HFILE_QUEUE_INIT_SIZE, 
DEFAULT_SMALL_HFILE_QUEUE_INIT_SIZE);
     largeFileQueue = new StealJobQueue<>(largeQueueInitSize, 
smallQueueInitSize);
     smallFileQueue = largeFileQueue.getStealFromQueue();
+    largeFileDeleteThreadNumber =
+        conf.getInt(LARGE_HFILE_DELETE_THREAD_NUMBER, 
DEFAULT_LARGE_HFILE_DELETE_THREAD_NUMBER);
+    smallFileDeleteThreadNumber =
+        conf.getInt(SMALL_HFILE_DELETE_THREAD_NUMBER, 
DEFAULT_SMALL_HFILE_DELETE_THREAD_NUMBER);
     startHFileDeleteThreads();
   }
 
@@ -183,30 +200,34 @@ public class HFileCleaner extends 
CleanerChore<BaseHFileCleanerDelegate> impleme
     final String n = Thread.currentThread().getName();
     running = true;
     // start thread for large file deletion
-    Thread large = new Thread() {
-      @Override
-      public void run() {
-        consumerLoop(largeFileQueue);
-      }
-    };
-    large.setDaemon(true);
-    large.setName(n + "-HFileCleaner.large-" + System.currentTimeMillis());
-    large.start();
-    LOG.debug("Starting hfile cleaner for large files: " + large.getName());
-    threads.add(large);
+    for (int i = 0; i < largeFileDeleteThreadNumber; i++) {
+      Thread large = new Thread() {
+        @Override
+        public void run() {
+          consumerLoop(largeFileQueue);
+        }
+      };
+      large.setDaemon(true);
+      large.setName(n + "-HFileCleaner.large." + i + "-" + 
System.currentTimeMillis());
+      large.start();
+      LOG.debug("Starting hfile cleaner for large files: " + large.getName());
+      threads.add(large);
+    }
 
     // start thread for small file deletion
-    Thread small = new Thread() {
-      @Override
-      public void run() {
-        consumerLoop(smallFileQueue);
-      }
-    };
-    small.setDaemon(true);
-    small.setName(n + "-HFileCleaner.small-" + System.currentTimeMillis());
-    small.start();
-    LOG.debug("Starting hfile cleaner for small files: " + small.getName());
-    threads.add(small);
+    for (int i = 0; i < smallFileDeleteThreadNumber; i++) {
+      Thread small = new Thread() {
+        @Override
+        public void run() {
+          consumerLoop(smallFileQueue);
+        }
+      };
+      small.setDaemon(true);
+      small.setName(n + "-HFileCleaner.small." + i + "-" + 
System.currentTimeMillis());
+      small.start();
+      LOG.debug("Starting hfile cleaner for small files: " + small.getName());
+      threads.add(small);
+    }
   }
 
   protected void consumerLoop(BlockingQueue<HFileDeleteTask> queue) {
@@ -248,20 +269,20 @@ public class HFileCleaner extends 
CleanerChore<BaseHFileCleanerDelegate> impleme
   // Currently only for testing purpose
   private void countDeletedFiles(boolean isLargeFile, boolean fromLargeQueue) {
     if (isLargeFile) {
-      if (deletedLargeFiles == Long.MAX_VALUE) {
+      if (deletedLargeFiles.get() == Long.MAX_VALUE) {
         LOG.info("Deleted more than Long.MAX_VALUE large files, reset counter 
to 0");
-        deletedLargeFiles = 0L;
+        deletedLargeFiles.set(0L);
       }
-      deletedLargeFiles++;
+      deletedLargeFiles.incrementAndGet();
     } else {
-      if (deletedSmallFiles == Long.MAX_VALUE) {
+      if (deletedSmallFiles.get() == Long.MAX_VALUE) {
         LOG.info("Deleted more than Long.MAX_VALUE small files, reset counter 
to 0");
-        deletedSmallFiles = 0L;
+        deletedSmallFiles.set(0L);;
       }
       if (fromLargeQueue && LOG.isTraceEnabled()) {
         LOG.trace("Stolen a small file deletion task in large file thread");
       }
-      deletedSmallFiles++;
+      deletedSmallFiles.incrementAndGet();
     }
   }
 
@@ -354,12 +375,12 @@ public class HFileCleaner extends 
CleanerChore<BaseHFileCleanerDelegate> impleme
 
   @VisibleForTesting
   public long getNumOfDeletedLargeFiles() {
-    return deletedLargeFiles;
+    return deletedLargeFiles.get();
   }
 
   @VisibleForTesting
   public long getNumOfDeletedSmallFiles() {
-    return deletedSmallFiles;
+    return deletedSmallFiles.get();
   }
 
   @VisibleForTesting
@@ -379,19 +400,14 @@ public class HFileCleaner extends 
CleanerChore<BaseHFileCleanerDelegate> impleme
 
   @Override
   public void onConfigurationChange(Configuration conf) {
-    StringBuilder builder = new StringBuilder();
-    builder.append("Updating configuration for HFileCleaner, previous throttle 
point: ")
-        .append(throttlePoint).append(", largeQueueInitSize: 
").append(largeQueueInitSize)
-        .append(", smallQueueInitSize: ").append(smallQueueInitSize);
+    if (!checkAndUpdateConfigurations(conf)) {
+      LOG.debug("Update configuration triggered but nothing changed for this 
cleaner");
+      return;
+    }
     stopHFileDeleteThreads();
-    this.throttlePoint =
-        conf.getInt(HFILE_DELETE_THROTTLE_THRESHOLD, 
DEFAULT_HFILE_DELETE_THROTTLE_THRESHOLD);
-    this.largeQueueInitSize =
-        conf.getInt(LARGE_HFILE_QUEUE_INIT_SIZE, 
DEFAULT_LARGE_HFILE_QUEUE_INIT_SIZE);
-    this.smallQueueInitSize =
-        conf.getInt(SMALL_HFILE_QUEUE_INIT_SIZE, 
DEFAULT_SMALL_HFILE_QUEUE_INIT_SIZE);
     // record the left over tasks
-    List<HFileDeleteTask> leftOverTasks = new ArrayList<>();
+    List<HFileDeleteTask> leftOverTasks =
+        new ArrayList<>(largeFileQueue.size() + smallFileQueue.size());
     for (HFileDeleteTask task : largeFileQueue) {
       leftOverTasks.add(task);
     }
@@ -401,13 +417,59 @@ public class HFileCleaner extends 
CleanerChore<BaseHFileCleanerDelegate> impleme
     largeFileQueue = new StealJobQueue<>(largeQueueInitSize, 
smallQueueInitSize);
     smallFileQueue = largeFileQueue.getStealFromQueue();
     threads.clear();
-    builder.append("; new throttle point: ").append(throttlePoint).append(", 
largeQueueInitSize: ")
-        .append(largeQueueInitSize).append(", smallQueueInitSize: 
").append(smallQueueInitSize);
-    LOG.debug(builder.toString());
     startHFileDeleteThreads();
     // re-dispatch the left over tasks
     for (HFileDeleteTask task : leftOverTasks) {
       dispatch(task);
     }
   }
+
+  /**
+   * Check new configuration and update settings if value changed
+   * @param conf The new configuration
+   * @return true if any configuration for HFileCleaner changes, false if no 
change
+   */
+  private boolean checkAndUpdateConfigurations(Configuration conf) {
+    boolean updated = false;
+    int throttlePoint =
+        conf.getInt(HFILE_DELETE_THROTTLE_THRESHOLD, 
DEFAULT_HFILE_DELETE_THROTTLE_THRESHOLD);
+    if (throttlePoint != this.throttlePoint) {
+      LOG.debug("Updating throttle point, from " + this.throttlePoint + " to " 
+ throttlePoint);
+      this.throttlePoint = throttlePoint;
+      updated = true;
+    }
+    int largeQueueInitSize =
+        conf.getInt(LARGE_HFILE_QUEUE_INIT_SIZE, 
DEFAULT_LARGE_HFILE_QUEUE_INIT_SIZE);
+    if (largeQueueInitSize != this.largeQueueInitSize) {
+      LOG.debug("Updating largeQueueInitSize, from " + this.largeQueueInitSize 
+ " to "
+          + largeQueueInitSize);
+      this.largeQueueInitSize = largeQueueInitSize;
+      updated = true;
+    }
+    int smallQueueInitSize =
+        conf.getInt(SMALL_HFILE_QUEUE_INIT_SIZE, 
DEFAULT_SMALL_HFILE_QUEUE_INIT_SIZE);
+    if (smallQueueInitSize != this.smallQueueInitSize) {
+      LOG.debug("Updating smallQueueInitSize, from " + this.smallQueueInitSize 
+ " to "
+          + smallQueueInitSize);
+      this.smallQueueInitSize = smallQueueInitSize;
+      updated = true;
+    }
+    int largeFileDeleteThreadNumber =
+        conf.getInt(LARGE_HFILE_DELETE_THREAD_NUMBER, 
DEFAULT_LARGE_HFILE_DELETE_THREAD_NUMBER);
+    if (largeFileDeleteThreadNumber != this.largeFileDeleteThreadNumber) {
+      LOG.debug("Updating largeFileDeleteThreadNumber, from " + 
this.largeFileDeleteThreadNumber
+          + " to " + largeFileDeleteThreadNumber);
+      this.largeFileDeleteThreadNumber = largeFileDeleteThreadNumber;
+      updated = true;
+    }
+    int smallFileDeleteThreadNumber =
+        conf.getInt(SMALL_HFILE_DELETE_THREAD_NUMBER, 
DEFAULT_SMALL_HFILE_DELETE_THREAD_NUMBER);
+    if (smallFileDeleteThreadNumber != this.smallFileDeleteThreadNumber) {
+      LOG.debug("Updating smallFileDeleteThreadNumber, from " + 
this.smallFileDeleteThreadNumber
+          + " to " + smallFileDeleteThreadNumber);
+      this.smallFileDeleteThreadNumber = smallFileDeleteThreadNumber;
+      updated = true;
+    }
+    return updated;
+  }
 }

http://git-wip-us.apache.org/repos/asf/hbase/blob/89d2adfe/hbase-server/src/test/java/org/apache/hadoop/hbase/master/cleaner/TestHFileCleaner.java
----------------------------------------------------------------------
diff --git 
a/hbase-server/src/test/java/org/apache/hadoop/hbase/master/cleaner/TestHFileCleaner.java
 
b/hbase-server/src/test/java/org/apache/hadoop/hbase/master/cleaner/TestHFileCleaner.java
index 5f05488..1c135b9 100644
--- 
a/hbase-server/src/test/java/org/apache/hadoop/hbase/master/cleaner/TestHFileCleaner.java
+++ 
b/hbase-server/src/test/java/org/apache/hadoop/hbase/master/cleaner/TestHFileCleaner.java
@@ -22,6 +22,7 @@ import static org.junit.Assert.assertFalse;
 import static org.junit.Assert.assertTrue;
 
 import java.io.IOException;
+import java.util.List;
 import java.util.Random;
 
 import org.apache.commons.logging.Log;
@@ -327,6 +328,8 @@ public class TestHFileCleaner {
     final int UPDATE_QUEUE_INIT_SIZE = 1024;
     final int LARGE_FILE_NUM = 5;
     final int SMALL_FILE_NUM = 20;
+    final int LARGE_THREAD_NUM = 2;
+    final int SMALL_THREAD_NUM = 4;
 
     Configuration conf = UTIL.getConfiguration();
     // no cleaner policies = delete all files
@@ -369,6 +372,8 @@ public class TestHFileCleaner {
     newConf.setInt(HFileCleaner.HFILE_DELETE_THROTTLE_THRESHOLD, 
UPDATE_THROTTLE_POINT);
     newConf.setInt(HFileCleaner.LARGE_HFILE_QUEUE_INIT_SIZE, 
UPDATE_QUEUE_INIT_SIZE);
     newConf.setInt(HFileCleaner.SMALL_HFILE_QUEUE_INIT_SIZE, 
UPDATE_QUEUE_INIT_SIZE);
+    newConf.setInt(HFileCleaner.LARGE_HFILE_DELETE_THREAD_NUMBER, 
LARGE_THREAD_NUM);
+    newConf.setInt(HFileCleaner.SMALL_HFILE_DELETE_THREAD_NUMBER, 
SMALL_THREAD_NUM);
     LOG.debug("File deleted from large queue: " + 
cleaner.getNumOfDeletedLargeFiles()
         + "; from small queue: " + cleaner.getNumOfDeletedSmallFiles());
     cleaner.onConfigurationChange(newConf);
@@ -377,7 +382,13 @@ public class TestHFileCleaner {
     Assert.assertEquals(UPDATE_THROTTLE_POINT, cleaner.getThrottlePoint());
     Assert.assertEquals(UPDATE_QUEUE_INIT_SIZE, 
cleaner.getLargeQueueInitSize());
     Assert.assertEquals(UPDATE_QUEUE_INIT_SIZE, 
cleaner.getSmallQueueInitSize());
-    Assert.assertEquals(2, cleaner.getCleanerThreads().size());
+    Assert.assertEquals(LARGE_THREAD_NUM + SMALL_THREAD_NUM, 
cleaner.getCleanerThreads().size());
+
+    // make sure no cost when onConfigurationChange called with no change
+    List<Thread> oldThreads = cleaner.getCleanerThreads();
+    cleaner.onConfigurationChange(newConf);
+    List<Thread> newThreads = cleaner.getCleanerThreads();
+    Assert.assertArrayEquals(oldThreads.toArray(), newThreads.toArray());
 
     // wait until clean done and check
     t.join();

Reply via email to