Repository: hbase
Updated Branches:
  refs/heads/master a66d49189 -> cbcbcf4dc


HBASE-17854 Use StealJobQueue in HFileCleaner after HBASE-17215


Project: http://git-wip-us.apache.org/repos/asf/hbase/repo
Commit: http://git-wip-us.apache.org/repos/asf/hbase/commit/cbcbcf4d
Tree: http://git-wip-us.apache.org/repos/asf/hbase/tree/cbcbcf4d
Diff: http://git-wip-us.apache.org/repos/asf/hbase/diff/cbcbcf4d

Branch: refs/heads/master
Commit: cbcbcf4dcd3401327cc36173f3ca8e5362da1e0c
Parents: a66d491
Author: Yu Li <l...@apache.org>
Authored: Wed Apr 5 17:53:21 2017 +0800
Committer: Yu Li <l...@apache.org>
Committed: Wed Apr 5 17:53:21 2017 +0800

----------------------------------------------------------------------
 .../hbase/master/cleaner/HFileCleaner.java      | 98 +++++++++++++-------
 .../apache/hadoop/hbase/util/StealJobQueue.java | 22 +++++
 .../hbase/master/cleaner/TestHFileCleaner.java  | 28 +++---
 .../hadoop/hbase/util/TestStealJobQueue.java    |  2 +-
 4 files changed, 102 insertions(+), 48 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hbase/blob/cbcbcf4d/hbase-server/src/main/java/org/apache/hadoop/hbase/master/cleaner/HFileCleaner.java
----------------------------------------------------------------------
diff --git 
a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/cleaner/HFileCleaner.java
 
b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/cleaner/HFileCleaner.java
index 3a68252..8b3515a 100644
--- 
a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/cleaner/HFileCleaner.java
+++ 
b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/cleaner/HFileCleaner.java
@@ -22,7 +22,6 @@ import java.util.ArrayList;
 import java.util.List;
 import java.util.Map;
 import java.util.concurrent.BlockingQueue;
-import java.util.concurrent.LinkedBlockingQueue;
 
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
@@ -35,6 +34,7 @@ import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.hbase.Stoppable;
 import org.apache.hadoop.hbase.io.HFileLink;
 import org.apache.hadoop.hbase.regionserver.StoreFileInfo;
+import org.apache.hadoop.hbase.util.StealJobQueue;
 
 import com.google.common.annotations.VisibleForTesting;
 /**
@@ -57,23 +57,23 @@ public class HFileCleaner extends 
CleanerChore<BaseHFileCleanerDelegate> impleme
       "hbase.regionserver.thread.hfilecleaner.throttle";
   public final static int DEFAULT_HFILE_DELETE_THROTTLE_THRESHOLD = 64 * 1024 
* 1024;// 64M
 
-  // Configuration key for large queue size
-  public final static String LARGE_HFILE_DELETE_QUEUE_SIZE =
+  // Configuration key for large queue initial size
+  public final static String LARGE_HFILE_QUEUE_INIT_SIZE =
       "hbase.regionserver.hfilecleaner.large.queue.size";
-  public final static int DEFAULT_LARGE_HFILE_DELETE_QUEUE_SIZE = 1048576;
+  public final static int DEFAULT_LARGE_HFILE_QUEUE_INIT_SIZE = 10240;
 
-  // Configuration key for small queue size
-  public final static String SMALL_HFILE_DELETE_QUEUE_SIZE =
+  // Configuration key for small queue initial size
+  public final static String SMALL_HFILE_QUEUE_INIT_SIZE =
       "hbase.regionserver.hfilecleaner.small.queue.size";
-  public final static int DEFAULT_SMALL_HFILE_DELETE_QUEUE_SIZE = 1048576;
+  public final static int DEFAULT_SMALL_HFILE_QUEUE_INIT_SIZE = 10240;
 
   private static final Log LOG = LogFactory.getLog(HFileCleaner.class);
 
-  BlockingQueue<HFileDeleteTask> largeFileQueue;
+  StealJobQueue<HFileDeleteTask> largeFileQueue;
   BlockingQueue<HFileDeleteTask> smallFileQueue;
   private int throttlePoint;
-  private int largeQueueSize;
-  private int smallQueueSize;
+  private int largeQueueInitSize;
+  private int smallQueueInitSize;
   private List<Thread> threads = new ArrayList<Thread>();
   private boolean running;
 
@@ -94,12 +94,12 @@ public class HFileCleaner extends 
CleanerChore<BaseHFileCleanerDelegate> impleme
       directory, MASTER_HFILE_CLEANER_PLUGINS, params);
     throttlePoint =
         conf.getInt(HFILE_DELETE_THROTTLE_THRESHOLD, 
DEFAULT_HFILE_DELETE_THROTTLE_THRESHOLD);
-    largeQueueSize =
-        conf.getInt(LARGE_HFILE_DELETE_QUEUE_SIZE, 
DEFAULT_LARGE_HFILE_DELETE_QUEUE_SIZE);
-    smallQueueSize =
-        conf.getInt(SMALL_HFILE_DELETE_QUEUE_SIZE, 
DEFAULT_SMALL_HFILE_DELETE_QUEUE_SIZE);
-    largeFileQueue = new 
LinkedBlockingQueue<HFileCleaner.HFileDeleteTask>(largeQueueSize);
-    smallFileQueue = new 
LinkedBlockingQueue<HFileCleaner.HFileDeleteTask>(smallQueueSize);
+    largeQueueInitSize =
+        conf.getInt(LARGE_HFILE_QUEUE_INIT_SIZE, 
DEFAULT_LARGE_HFILE_QUEUE_INIT_SIZE);
+    smallQueueInitSize =
+        conf.getInt(SMALL_HFILE_QUEUE_INIT_SIZE, 
DEFAULT_SMALL_HFILE_QUEUE_INIT_SIZE);
+    largeFileQueue = new StealJobQueue<>(largeQueueInitSize, 
smallQueueInitSize);
+    smallFileQueue = largeFileQueue.getStealFromQueue();
     startHFileDeleteThreads();
   }
 
@@ -152,6 +152,7 @@ public class HFileCleaner extends 
CleanerChore<BaseHFileCleanerDelegate> impleme
   private boolean dispatch(HFileDeleteTask task) {
     if (task.fileLength >= this.throttlePoint) {
       if (!this.largeFileQueue.offer(task)) {
+        // should never arrive here as long as we use PriorityQueue
         if (LOG.isTraceEnabled()) {
           LOG.trace("Large file deletion queue is full");
         }
@@ -159,6 +160,7 @@ public class HFileCleaner extends 
CleanerChore<BaseHFileCleanerDelegate> impleme
       }
     } else {
       if (!this.smallFileQueue.offer(task)) {
+        // should never arrive here as long as we use PriorityQueue
         if (LOG.isTraceEnabled()) {
           LOG.trace("Small file deletion queue is full");
         }
@@ -232,7 +234,7 @@ public class HFileCleaner extends 
CleanerChore<BaseHFileCleanerDelegate> impleme
           }
           task.setResult(succeed);
           if (succeed) {
-            countDeletedFiles(queue == largeFileQueue);
+            countDeletedFiles(task.fileLength >= throttlePoint, queue == 
largeFileQueue);
           }
         }
       }
@@ -244,8 +246,8 @@ public class HFileCleaner extends 
CleanerChore<BaseHFileCleanerDelegate> impleme
   }
 
   // Currently only for testing purpose
-  private void countDeletedFiles(boolean isLarge) {
-    if (isLarge) {
+  private void countDeletedFiles(boolean isLargeFile, boolean fromLargeQueue) {
+    if (isLargeFile) {
       if (deletedLargeFiles == Long.MAX_VALUE) {
         LOG.info("Deleted more than Long.MAX_VALUE large files, reset counter 
to 0");
         deletedLargeFiles = 0L;
@@ -256,6 +258,9 @@ public class HFileCleaner extends 
CleanerChore<BaseHFileCleanerDelegate> impleme
         LOG.info("Deleted more than Long.MAX_VALUE small files, reset counter 
to 0");
         deletedSmallFiles = 0L;
       }
+      if (fromLargeQueue && LOG.isTraceEnabled()) {
+        LOG.trace("Stolen a small file deletion task in large file thread");
+      }
       deletedSmallFiles++;
     }
   }
@@ -273,7 +278,7 @@ public class HFileCleaner extends 
CleanerChore<BaseHFileCleanerDelegate> impleme
     }
   }
 
-  static class HFileDeleteTask {
+  static class HFileDeleteTask implements Comparable<HFileDeleteTask> {
     private static final long MAX_WAIT = 60 * 1000L;
     private static final long WAIT_UNIT = 1000L;
 
@@ -315,6 +320,31 @@ public class HFileCleaner extends 
CleanerChore<BaseHFileCleanerDelegate> impleme
       }
       return this.result;
     }
+
+    @Override
+    public int compareTo(HFileDeleteTask o) {
+      long sub = this.fileLength - o.fileLength;
+      // smaller value with higher priority in PriorityQueue, and we intent to 
delete the larger
+      // file first.
+      return (sub > 0) ? -1 : (sub < 0 ? 1 : 0);
+    }
+
+    @Override
+    public boolean equals(Object o) {
+      if (this == o) {
+        return true;
+      }
+      if (o == null || !(o instanceof HFileDeleteTask)) {
+        return false;
+      }
+      HFileDeleteTask otherTask = (HFileDeleteTask) o;
+      return this.filePath.equals(otherTask.filePath) && (this.fileLength == 
otherTask.fileLength);
+    }
+
+    @Override
+    public int hashCode() {
+      return filePath.hashCode();
+    }
   }
 
   @VisibleForTesting
@@ -333,13 +363,13 @@ public class HFileCleaner extends 
CleanerChore<BaseHFileCleanerDelegate> impleme
   }
 
   @VisibleForTesting
-  public long getLargeQueueSize() {
-    return largeQueueSize;
+  public long getLargeQueueInitSize() {
+    return largeQueueInitSize;
   }
 
   @VisibleForTesting
-  public long getSmallQueueSize() {
-    return smallQueueSize;
+  public long getSmallQueueInitSize() {
+    return smallQueueInitSize;
   }
 
   @VisibleForTesting
@@ -351,15 +381,15 @@ public class HFileCleaner extends 
CleanerChore<BaseHFileCleanerDelegate> impleme
   public void onConfigurationChange(Configuration conf) {
     StringBuilder builder = new StringBuilder();
     builder.append("Updating configuration for HFileCleaner, previous throttle 
point: ")
-        .append(throttlePoint).append(", largeQueueSize: 
").append(largeQueueSize)
-        .append(", smallQueueSize: ").append(smallQueueSize);
+        .append(throttlePoint).append(", largeQueueInitSize: 
").append(largeQueueInitSize)
+        .append(", smallQueueInitSize: ").append(smallQueueInitSize);
     stopHFileDeleteThreads();
     this.throttlePoint =
         conf.getInt(HFILE_DELETE_THROTTLE_THRESHOLD, 
DEFAULT_HFILE_DELETE_THROTTLE_THRESHOLD);
-    this.largeQueueSize =
-        conf.getInt(LARGE_HFILE_DELETE_QUEUE_SIZE, 
DEFAULT_LARGE_HFILE_DELETE_QUEUE_SIZE);
-    this.smallQueueSize =
-        conf.getInt(SMALL_HFILE_DELETE_QUEUE_SIZE, 
DEFAULT_SMALL_HFILE_DELETE_QUEUE_SIZE);
+    this.largeQueueInitSize =
+        conf.getInt(LARGE_HFILE_QUEUE_INIT_SIZE, 
DEFAULT_LARGE_HFILE_QUEUE_INIT_SIZE);
+    this.smallQueueInitSize =
+        conf.getInt(SMALL_HFILE_QUEUE_INIT_SIZE, 
DEFAULT_SMALL_HFILE_QUEUE_INIT_SIZE);
     // record the left over tasks
     List<HFileDeleteTask> leftOverTasks = new ArrayList<>();
     for (HFileDeleteTask task : largeFileQueue) {
@@ -368,11 +398,11 @@ public class HFileCleaner extends 
CleanerChore<BaseHFileCleanerDelegate> impleme
     for (HFileDeleteTask task : smallFileQueue) {
       leftOverTasks.add(task);
     }
-    largeFileQueue = new 
LinkedBlockingQueue<HFileCleaner.HFileDeleteTask>(largeQueueSize);
-    smallFileQueue = new 
LinkedBlockingQueue<HFileCleaner.HFileDeleteTask>(smallQueueSize);
+    largeFileQueue = new StealJobQueue<>(largeQueueInitSize, 
smallQueueInitSize);
+    smallFileQueue = largeFileQueue.getStealFromQueue();
     threads.clear();
-    builder.append("; new throttle point: ").append(throttlePoint).append(", 
largeQueueSize: ")
-        .append(largeQueueSize).append(", smallQueueSize: 
").append(smallQueueSize);
+    builder.append("; new throttle point: ").append(throttlePoint).append(", 
largeQueueInitSize: ")
+        .append(largeQueueInitSize).append(", smallQueueInitSize: 
").append(smallQueueInitSize);
     LOG.debug(builder.toString());
     startHFileDeleteThreads();
     // re-dispatch the left over tasks

http://git-wip-us.apache.org/repos/asf/hbase/blob/cbcbcf4d/hbase-server/src/main/java/org/apache/hadoop/hbase/util/StealJobQueue.java
----------------------------------------------------------------------
diff --git 
a/hbase-server/src/main/java/org/apache/hadoop/hbase/util/StealJobQueue.java 
b/hbase-server/src/main/java/org/apache/hadoop/hbase/util/StealJobQueue.java
index 74f0747..5e7e232 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/util/StealJobQueue.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/util/StealJobQueue.java
@@ -48,6 +48,24 @@ public class StealJobQueue<T> extends 
PriorityBlockingQueue<T> {
 
   public StealJobQueue() {
     this.stealFromQueue = new PriorityBlockingQueue<T>() {
+
+      @Override
+      public boolean offer(T t) {
+        lock.lock();
+        try {
+          notEmpty.signal();
+          return super.offer(t);
+        } finally {
+          lock.unlock();
+        }
+      }
+    };
+  }
+
+  public StealJobQueue(int initCapacity, int stealFromQueueInitCapacity) {
+    super(initCapacity);
+    this.stealFromQueue = new 
PriorityBlockingQueue<T>(stealFromQueueInitCapacity) {
+
       @Override
       public boolean offer(T t) {
         lock.lock();
@@ -61,6 +79,10 @@ public class StealJobQueue<T> extends 
PriorityBlockingQueue<T> {
     };
   }
 
+  /**
+   * Get a queue whose job might be stolen by the consumer of this original 
queue
+   * @return the queue whose job could be stolen
+   */
   public BlockingQueue<T> getStealFromQueue() {
     return stealFromQueue;
   }

http://git-wip-us.apache.org/repos/asf/hbase/blob/cbcbcf4d/hbase-server/src/test/java/org/apache/hadoop/hbase/master/cleaner/TestHFileCleaner.java
----------------------------------------------------------------------
diff --git 
a/hbase-server/src/test/java/org/apache/hadoop/hbase/master/cleaner/TestHFileCleaner.java
 
b/hbase-server/src/test/java/org/apache/hadoop/hbase/master/cleaner/TestHFileCleaner.java
index 8e8a4dd..5f05488 100644
--- 
a/hbase-server/src/test/java/org/apache/hadoop/hbase/master/cleaner/TestHFileCleaner.java
+++ 
b/hbase-server/src/test/java/org/apache/hadoop/hbase/master/cleaner/TestHFileCleaner.java
@@ -322,9 +322,9 @@ public class TestHFileCleaner {
   public void testOnConfigurationChange() throws Exception {
     // constants
     final int ORIGINAL_THROTTLE_POINT = 512 * 1024;
-    final int ORIGINAL_QUEUE_SIZE = 512;
+    final int ORIGINAL_QUEUE_INIT_SIZE = 512;
     final int UPDATE_THROTTLE_POINT = 1024;// small enough to change 
large/small check
-    final int UPDATE_QUEUE_SIZE = 1024;
+    final int UPDATE_QUEUE_INIT_SIZE = 1024;
     final int LARGE_FILE_NUM = 5;
     final int SMALL_FILE_NUM = 20;
 
@@ -332,8 +332,8 @@ public class TestHFileCleaner {
     // no cleaner policies = delete all files
     conf.setStrings(HFileCleaner.MASTER_HFILE_CLEANER_PLUGINS, "");
     conf.setInt(HFileCleaner.HFILE_DELETE_THROTTLE_THRESHOLD, 
ORIGINAL_THROTTLE_POINT);
-    conf.setInt(HFileCleaner.LARGE_HFILE_DELETE_QUEUE_SIZE, 
ORIGINAL_QUEUE_SIZE);
-    conf.setInt(HFileCleaner.SMALL_HFILE_DELETE_QUEUE_SIZE, 
ORIGINAL_QUEUE_SIZE);
+    conf.setInt(HFileCleaner.LARGE_HFILE_QUEUE_INIT_SIZE, 
ORIGINAL_QUEUE_INIT_SIZE);
+    conf.setInt(HFileCleaner.SMALL_HFILE_QUEUE_INIT_SIZE, 
ORIGINAL_QUEUE_INIT_SIZE);
     Server server = new DummyServer();
     Path archivedHfileDir =
         new Path(UTIL.getDataTestDirOnTestFS(), 
HConstants.HFILE_ARCHIVE_DIRECTORY);
@@ -342,8 +342,8 @@ public class TestHFileCleaner {
     FileSystem fs = UTIL.getDFSCluster().getFileSystem();
     final HFileCleaner cleaner = new HFileCleaner(1000, server, conf, fs, 
archivedHfileDir);
     Assert.assertEquals(ORIGINAL_THROTTLE_POINT, cleaner.getThrottlePoint());
-    Assert.assertEquals(ORIGINAL_QUEUE_SIZE, cleaner.getLargeQueueSize());
-    Assert.assertEquals(ORIGINAL_QUEUE_SIZE, cleaner.getSmallQueueSize());
+    Assert.assertEquals(ORIGINAL_QUEUE_INIT_SIZE, 
cleaner.getLargeQueueInitSize());
+    Assert.assertEquals(ORIGINAL_QUEUE_INIT_SIZE, 
cleaner.getSmallQueueInitSize());
 
     // clean up archive directory and create files for testing
     fs.delete(archivedHfileDir, true);
@@ -359,22 +359,24 @@ public class TestHFileCleaner {
     };
     t.setDaemon(true);
     t.start();
-    // let the cleaner run for some while
-    Thread.sleep(20);
+    // wait until file clean started
+    while (cleaner.getNumOfDeletedSmallFiles() == 0) {
+      Thread.yield();
+    }
 
     // trigger configuration change
     Configuration newConf = new Configuration(conf);
     newConf.setInt(HFileCleaner.HFILE_DELETE_THROTTLE_THRESHOLD, 
UPDATE_THROTTLE_POINT);
-    newConf.setInt(HFileCleaner.LARGE_HFILE_DELETE_QUEUE_SIZE, 
UPDATE_QUEUE_SIZE);
-    newConf.setInt(HFileCleaner.SMALL_HFILE_DELETE_QUEUE_SIZE, 
UPDATE_QUEUE_SIZE);
-    cleaner.onConfigurationChange(newConf);
+    newConf.setInt(HFileCleaner.LARGE_HFILE_QUEUE_INIT_SIZE, 
UPDATE_QUEUE_INIT_SIZE);
+    newConf.setInt(HFileCleaner.SMALL_HFILE_QUEUE_INIT_SIZE, 
UPDATE_QUEUE_INIT_SIZE);
     LOG.debug("File deleted from large queue: " + 
cleaner.getNumOfDeletedLargeFiles()
         + "; from small queue: " + cleaner.getNumOfDeletedSmallFiles());
+    cleaner.onConfigurationChange(newConf);
 
     // check values after change
     Assert.assertEquals(UPDATE_THROTTLE_POINT, cleaner.getThrottlePoint());
-    Assert.assertEquals(UPDATE_QUEUE_SIZE, cleaner.getLargeQueueSize());
-    Assert.assertEquals(UPDATE_QUEUE_SIZE, cleaner.getSmallQueueSize());
+    Assert.assertEquals(UPDATE_QUEUE_INIT_SIZE, 
cleaner.getLargeQueueInitSize());
+    Assert.assertEquals(UPDATE_QUEUE_INIT_SIZE, 
cleaner.getSmallQueueInitSize());
     Assert.assertEquals(2, cleaner.getCleanerThreads().size());
 
     // wait until clean done and check

http://git-wip-us.apache.org/repos/asf/hbase/blob/cbcbcf4d/hbase-server/src/test/java/org/apache/hadoop/hbase/util/TestStealJobQueue.java
----------------------------------------------------------------------
diff --git 
a/hbase-server/src/test/java/org/apache/hadoop/hbase/util/TestStealJobQueue.java
 
b/hbase-server/src/test/java/org/apache/hadoop/hbase/util/TestStealJobQueue.java
index b35f6f4..54fdaca 100644
--- 
a/hbase-server/src/test/java/org/apache/hadoop/hbase/util/TestStealJobQueue.java
+++ 
b/hbase-server/src/test/java/org/apache/hadoop/hbase/util/TestStealJobQueue.java
@@ -38,7 +38,7 @@ import static org.junit.Assert.*;
 public class TestStealJobQueue {
 
   StealJobQueue<Integer> stealJobQueue;
-  BlockingQueue stealFromQueue;
+  BlockingQueue<Integer> stealFromQueue;
 
   @Before
   public void setup() {

Reply via email to