Repository: hbase Updated Branches: refs/heads/master a66d49189 -> cbcbcf4dc
HBASE-17854 Use StealJobQueue in HFileCleaner after HBASE-17215 Project: http://git-wip-us.apache.org/repos/asf/hbase/repo Commit: http://git-wip-us.apache.org/repos/asf/hbase/commit/cbcbcf4d Tree: http://git-wip-us.apache.org/repos/asf/hbase/tree/cbcbcf4d Diff: http://git-wip-us.apache.org/repos/asf/hbase/diff/cbcbcf4d Branch: refs/heads/master Commit: cbcbcf4dcd3401327cc36173f3ca8e5362da1e0c Parents: a66d491 Author: Yu Li <l...@apache.org> Authored: Wed Apr 5 17:53:21 2017 +0800 Committer: Yu Li <l...@apache.org> Committed: Wed Apr 5 17:53:21 2017 +0800 ---------------------------------------------------------------------- .../hbase/master/cleaner/HFileCleaner.java | 98 +++++++++++++------- .../apache/hadoop/hbase/util/StealJobQueue.java | 22 +++++ .../hbase/master/cleaner/TestHFileCleaner.java | 28 +++--- .../hadoop/hbase/util/TestStealJobQueue.java | 2 +- 4 files changed, 102 insertions(+), 48 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/hbase/blob/cbcbcf4d/hbase-server/src/main/java/org/apache/hadoop/hbase/master/cleaner/HFileCleaner.java ---------------------------------------------------------------------- diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/cleaner/HFileCleaner.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/cleaner/HFileCleaner.java index 3a68252..8b3515a 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/cleaner/HFileCleaner.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/cleaner/HFileCleaner.java @@ -22,7 +22,6 @@ import java.util.ArrayList; import java.util.List; import java.util.Map; import java.util.concurrent.BlockingQueue; -import java.util.concurrent.LinkedBlockingQueue; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; @@ -35,6 +34,7 @@ import org.apache.hadoop.fs.Path; import org.apache.hadoop.hbase.Stoppable; import org.apache.hadoop.hbase.io.HFileLink; import org.apache.hadoop.hbase.regionserver.StoreFileInfo; +import org.apache.hadoop.hbase.util.StealJobQueue; import com.google.common.annotations.VisibleForTesting; /** @@ -57,23 +57,23 @@ public class HFileCleaner extends CleanerChore<BaseHFileCleanerDelegate> impleme "hbase.regionserver.thread.hfilecleaner.throttle"; public final static int DEFAULT_HFILE_DELETE_THROTTLE_THRESHOLD = 64 * 1024 * 1024;// 64M - // Configuration key for large queue size - public final static String LARGE_HFILE_DELETE_QUEUE_SIZE = + // Configuration key for large queue initial size + public final static String LARGE_HFILE_QUEUE_INIT_SIZE = "hbase.regionserver.hfilecleaner.large.queue.size"; - public final static int DEFAULT_LARGE_HFILE_DELETE_QUEUE_SIZE = 1048576; + public final static int DEFAULT_LARGE_HFILE_QUEUE_INIT_SIZE = 10240; - // Configuration key for small queue size - public final static String SMALL_HFILE_DELETE_QUEUE_SIZE = + // Configuration key for small queue initial size + public final static String SMALL_HFILE_QUEUE_INIT_SIZE = "hbase.regionserver.hfilecleaner.small.queue.size"; - public final static int DEFAULT_SMALL_HFILE_DELETE_QUEUE_SIZE = 1048576; + public final static int DEFAULT_SMALL_HFILE_QUEUE_INIT_SIZE = 10240; private static final Log LOG = LogFactory.getLog(HFileCleaner.class); - BlockingQueue<HFileDeleteTask> largeFileQueue; + StealJobQueue<HFileDeleteTask> largeFileQueue; BlockingQueue<HFileDeleteTask> smallFileQueue; private int throttlePoint; - private int largeQueueSize; - private int smallQueueSize; + private int largeQueueInitSize; + private int smallQueueInitSize; private List<Thread> threads = new ArrayList<Thread>(); private boolean running; @@ -94,12 +94,12 @@ public class HFileCleaner extends CleanerChore<BaseHFileCleanerDelegate> impleme directory, MASTER_HFILE_CLEANER_PLUGINS, params); throttlePoint = conf.getInt(HFILE_DELETE_THROTTLE_THRESHOLD, DEFAULT_HFILE_DELETE_THROTTLE_THRESHOLD); - largeQueueSize = - conf.getInt(LARGE_HFILE_DELETE_QUEUE_SIZE, DEFAULT_LARGE_HFILE_DELETE_QUEUE_SIZE); - smallQueueSize = - conf.getInt(SMALL_HFILE_DELETE_QUEUE_SIZE, DEFAULT_SMALL_HFILE_DELETE_QUEUE_SIZE); - largeFileQueue = new LinkedBlockingQueue<HFileCleaner.HFileDeleteTask>(largeQueueSize); - smallFileQueue = new LinkedBlockingQueue<HFileCleaner.HFileDeleteTask>(smallQueueSize); + largeQueueInitSize = + conf.getInt(LARGE_HFILE_QUEUE_INIT_SIZE, DEFAULT_LARGE_HFILE_QUEUE_INIT_SIZE); + smallQueueInitSize = + conf.getInt(SMALL_HFILE_QUEUE_INIT_SIZE, DEFAULT_SMALL_HFILE_QUEUE_INIT_SIZE); + largeFileQueue = new StealJobQueue<>(largeQueueInitSize, smallQueueInitSize); + smallFileQueue = largeFileQueue.getStealFromQueue(); startHFileDeleteThreads(); } @@ -152,6 +152,7 @@ public class HFileCleaner extends CleanerChore<BaseHFileCleanerDelegate> impleme private boolean dispatch(HFileDeleteTask task) { if (task.fileLength >= this.throttlePoint) { if (!this.largeFileQueue.offer(task)) { + // should never arrive here as long as we use PriorityQueue if (LOG.isTraceEnabled()) { LOG.trace("Large file deletion queue is full"); } @@ -159,6 +160,7 @@ public class HFileCleaner extends CleanerChore<BaseHFileCleanerDelegate> impleme } } else { if (!this.smallFileQueue.offer(task)) { + // should never arrive here as long as we use PriorityQueue if (LOG.isTraceEnabled()) { LOG.trace("Small file deletion queue is full"); } @@ -232,7 +234,7 @@ public class HFileCleaner extends CleanerChore<BaseHFileCleanerDelegate> impleme } task.setResult(succeed); if (succeed) { - countDeletedFiles(queue == largeFileQueue); + countDeletedFiles(task.fileLength >= throttlePoint, queue == largeFileQueue); } } } @@ -244,8 +246,8 @@ public class HFileCleaner extends CleanerChore<BaseHFileCleanerDelegate> impleme } // Currently only for testing purpose - private void countDeletedFiles(boolean isLarge) { - if (isLarge) { + private void countDeletedFiles(boolean isLargeFile, boolean fromLargeQueue) { + if (isLargeFile) { if (deletedLargeFiles == Long.MAX_VALUE) { LOG.info("Deleted more than Long.MAX_VALUE large files, reset counter to 0"); deletedLargeFiles = 0L; @@ -256,6 +258,9 @@ public class HFileCleaner extends CleanerChore<BaseHFileCleanerDelegate> impleme LOG.info("Deleted more than Long.MAX_VALUE small files, reset counter to 0"); deletedSmallFiles = 0L; } + if (fromLargeQueue && LOG.isTraceEnabled()) { + LOG.trace("Stolen a small file deletion task in large file thread"); + } deletedSmallFiles++; } } @@ -273,7 +278,7 @@ public class HFileCleaner extends CleanerChore<BaseHFileCleanerDelegate> impleme } } - static class HFileDeleteTask { + static class HFileDeleteTask implements Comparable<HFileDeleteTask> { private static final long MAX_WAIT = 60 * 1000L; private static final long WAIT_UNIT = 1000L; @@ -315,6 +320,31 @@ public class HFileCleaner extends CleanerChore<BaseHFileCleanerDelegate> impleme } return this.result; } + + @Override + public int compareTo(HFileDeleteTask o) { + long sub = this.fileLength - o.fileLength; + // smaller value with higher priority in PriorityQueue, and we intent to delete the larger + // file first. + return (sub > 0) ? -1 : (sub < 0 ? 1 : 0); + } + + @Override + public boolean equals(Object o) { + if (this == o) { + return true; + } + if (o == null || !(o instanceof HFileDeleteTask)) { + return false; + } + HFileDeleteTask otherTask = (HFileDeleteTask) o; + return this.filePath.equals(otherTask.filePath) && (this.fileLength == otherTask.fileLength); + } + + @Override + public int hashCode() { + return filePath.hashCode(); + } } @VisibleForTesting @@ -333,13 +363,13 @@ public class HFileCleaner extends CleanerChore<BaseHFileCleanerDelegate> impleme } @VisibleForTesting - public long getLargeQueueSize() { - return largeQueueSize; + public long getLargeQueueInitSize() { + return largeQueueInitSize; } @VisibleForTesting - public long getSmallQueueSize() { - return smallQueueSize; + public long getSmallQueueInitSize() { + return smallQueueInitSize; } @VisibleForTesting @@ -351,15 +381,15 @@ public class HFileCleaner extends CleanerChore<BaseHFileCleanerDelegate> impleme public void onConfigurationChange(Configuration conf) { StringBuilder builder = new StringBuilder(); builder.append("Updating configuration for HFileCleaner, previous throttle point: ") - .append(throttlePoint).append(", largeQueueSize: ").append(largeQueueSize) - .append(", smallQueueSize: ").append(smallQueueSize); + .append(throttlePoint).append(", largeQueueInitSize: ").append(largeQueueInitSize) + .append(", smallQueueInitSize: ").append(smallQueueInitSize); stopHFileDeleteThreads(); this.throttlePoint = conf.getInt(HFILE_DELETE_THROTTLE_THRESHOLD, DEFAULT_HFILE_DELETE_THROTTLE_THRESHOLD); - this.largeQueueSize = - conf.getInt(LARGE_HFILE_DELETE_QUEUE_SIZE, DEFAULT_LARGE_HFILE_DELETE_QUEUE_SIZE); - this.smallQueueSize = - conf.getInt(SMALL_HFILE_DELETE_QUEUE_SIZE, DEFAULT_SMALL_HFILE_DELETE_QUEUE_SIZE); + this.largeQueueInitSize = + conf.getInt(LARGE_HFILE_QUEUE_INIT_SIZE, DEFAULT_LARGE_HFILE_QUEUE_INIT_SIZE); + this.smallQueueInitSize = + conf.getInt(SMALL_HFILE_QUEUE_INIT_SIZE, DEFAULT_SMALL_HFILE_QUEUE_INIT_SIZE); // record the left over tasks List<HFileDeleteTask> leftOverTasks = new ArrayList<>(); for (HFileDeleteTask task : largeFileQueue) { @@ -368,11 +398,11 @@ public class HFileCleaner extends CleanerChore<BaseHFileCleanerDelegate> impleme for (HFileDeleteTask task : smallFileQueue) { leftOverTasks.add(task); } - largeFileQueue = new LinkedBlockingQueue<HFileCleaner.HFileDeleteTask>(largeQueueSize); - smallFileQueue = new LinkedBlockingQueue<HFileCleaner.HFileDeleteTask>(smallQueueSize); + largeFileQueue = new StealJobQueue<>(largeQueueInitSize, smallQueueInitSize); + smallFileQueue = largeFileQueue.getStealFromQueue(); threads.clear(); - builder.append("; new throttle point: ").append(throttlePoint).append(", largeQueueSize: ") - .append(largeQueueSize).append(", smallQueueSize: ").append(smallQueueSize); + builder.append("; new throttle point: ").append(throttlePoint).append(", largeQueueInitSize: ") + .append(largeQueueInitSize).append(", smallQueueInitSize: ").append(smallQueueInitSize); LOG.debug(builder.toString()); startHFileDeleteThreads(); // re-dispatch the left over tasks http://git-wip-us.apache.org/repos/asf/hbase/blob/cbcbcf4d/hbase-server/src/main/java/org/apache/hadoop/hbase/util/StealJobQueue.java ---------------------------------------------------------------------- diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/util/StealJobQueue.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/util/StealJobQueue.java index 74f0747..5e7e232 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/util/StealJobQueue.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/util/StealJobQueue.java @@ -48,6 +48,24 @@ public class StealJobQueue<T> extends PriorityBlockingQueue<T> { public StealJobQueue() { this.stealFromQueue = new PriorityBlockingQueue<T>() { + + @Override + public boolean offer(T t) { + lock.lock(); + try { + notEmpty.signal(); + return super.offer(t); + } finally { + lock.unlock(); + } + } + }; + } + + public StealJobQueue(int initCapacity, int stealFromQueueInitCapacity) { + super(initCapacity); + this.stealFromQueue = new PriorityBlockingQueue<T>(stealFromQueueInitCapacity) { + @Override public boolean offer(T t) { lock.lock(); @@ -61,6 +79,10 @@ public class StealJobQueue<T> extends PriorityBlockingQueue<T> { }; } + /** + * Get a queue whose job might be stolen by the consumer of this original queue + * @return the queue whose job could be stolen + */ public BlockingQueue<T> getStealFromQueue() { return stealFromQueue; } http://git-wip-us.apache.org/repos/asf/hbase/blob/cbcbcf4d/hbase-server/src/test/java/org/apache/hadoop/hbase/master/cleaner/TestHFileCleaner.java ---------------------------------------------------------------------- diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/master/cleaner/TestHFileCleaner.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/master/cleaner/TestHFileCleaner.java index 8e8a4dd..5f05488 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/master/cleaner/TestHFileCleaner.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/master/cleaner/TestHFileCleaner.java @@ -322,9 +322,9 @@ public class TestHFileCleaner { public void testOnConfigurationChange() throws Exception { // constants final int ORIGINAL_THROTTLE_POINT = 512 * 1024; - final int ORIGINAL_QUEUE_SIZE = 512; + final int ORIGINAL_QUEUE_INIT_SIZE = 512; final int UPDATE_THROTTLE_POINT = 1024;// small enough to change large/small check - final int UPDATE_QUEUE_SIZE = 1024; + final int UPDATE_QUEUE_INIT_SIZE = 1024; final int LARGE_FILE_NUM = 5; final int SMALL_FILE_NUM = 20; @@ -332,8 +332,8 @@ public class TestHFileCleaner { // no cleaner policies = delete all files conf.setStrings(HFileCleaner.MASTER_HFILE_CLEANER_PLUGINS, ""); conf.setInt(HFileCleaner.HFILE_DELETE_THROTTLE_THRESHOLD, ORIGINAL_THROTTLE_POINT); - conf.setInt(HFileCleaner.LARGE_HFILE_DELETE_QUEUE_SIZE, ORIGINAL_QUEUE_SIZE); - conf.setInt(HFileCleaner.SMALL_HFILE_DELETE_QUEUE_SIZE, ORIGINAL_QUEUE_SIZE); + conf.setInt(HFileCleaner.LARGE_HFILE_QUEUE_INIT_SIZE, ORIGINAL_QUEUE_INIT_SIZE); + conf.setInt(HFileCleaner.SMALL_HFILE_QUEUE_INIT_SIZE, ORIGINAL_QUEUE_INIT_SIZE); Server server = new DummyServer(); Path archivedHfileDir = new Path(UTIL.getDataTestDirOnTestFS(), HConstants.HFILE_ARCHIVE_DIRECTORY); @@ -342,8 +342,8 @@ public class TestHFileCleaner { FileSystem fs = UTIL.getDFSCluster().getFileSystem(); final HFileCleaner cleaner = new HFileCleaner(1000, server, conf, fs, archivedHfileDir); Assert.assertEquals(ORIGINAL_THROTTLE_POINT, cleaner.getThrottlePoint()); - Assert.assertEquals(ORIGINAL_QUEUE_SIZE, cleaner.getLargeQueueSize()); - Assert.assertEquals(ORIGINAL_QUEUE_SIZE, cleaner.getSmallQueueSize()); + Assert.assertEquals(ORIGINAL_QUEUE_INIT_SIZE, cleaner.getLargeQueueInitSize()); + Assert.assertEquals(ORIGINAL_QUEUE_INIT_SIZE, cleaner.getSmallQueueInitSize()); // clean up archive directory and create files for testing fs.delete(archivedHfileDir, true); @@ -359,22 +359,24 @@ public class TestHFileCleaner { }; t.setDaemon(true); t.start(); - // let the cleaner run for some while - Thread.sleep(20); + // wait until file clean started + while (cleaner.getNumOfDeletedSmallFiles() == 0) { + Thread.yield(); + } // trigger configuration change Configuration newConf = new Configuration(conf); newConf.setInt(HFileCleaner.HFILE_DELETE_THROTTLE_THRESHOLD, UPDATE_THROTTLE_POINT); - newConf.setInt(HFileCleaner.LARGE_HFILE_DELETE_QUEUE_SIZE, UPDATE_QUEUE_SIZE); - newConf.setInt(HFileCleaner.SMALL_HFILE_DELETE_QUEUE_SIZE, UPDATE_QUEUE_SIZE); - cleaner.onConfigurationChange(newConf); + newConf.setInt(HFileCleaner.LARGE_HFILE_QUEUE_INIT_SIZE, UPDATE_QUEUE_INIT_SIZE); + newConf.setInt(HFileCleaner.SMALL_HFILE_QUEUE_INIT_SIZE, UPDATE_QUEUE_INIT_SIZE); LOG.debug("File deleted from large queue: " + cleaner.getNumOfDeletedLargeFiles() + "; from small queue: " + cleaner.getNumOfDeletedSmallFiles()); + cleaner.onConfigurationChange(newConf); // check values after change Assert.assertEquals(UPDATE_THROTTLE_POINT, cleaner.getThrottlePoint()); - Assert.assertEquals(UPDATE_QUEUE_SIZE, cleaner.getLargeQueueSize()); - Assert.assertEquals(UPDATE_QUEUE_SIZE, cleaner.getSmallQueueSize()); + Assert.assertEquals(UPDATE_QUEUE_INIT_SIZE, cleaner.getLargeQueueInitSize()); + Assert.assertEquals(UPDATE_QUEUE_INIT_SIZE, cleaner.getSmallQueueInitSize()); Assert.assertEquals(2, cleaner.getCleanerThreads().size()); // wait until clean done and check http://git-wip-us.apache.org/repos/asf/hbase/blob/cbcbcf4d/hbase-server/src/test/java/org/apache/hadoop/hbase/util/TestStealJobQueue.java ---------------------------------------------------------------------- diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/util/TestStealJobQueue.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/util/TestStealJobQueue.java index b35f6f4..54fdaca 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/util/TestStealJobQueue.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/util/TestStealJobQueue.java @@ -38,7 +38,7 @@ import static org.junit.Assert.*; public class TestStealJobQueue { StealJobQueue<Integer> stealJobQueue; - BlockingQueue stealFromQueue; + BlockingQueue<Integer> stealFromQueue; @Before public void setup() {