Repository: hbase Updated Branches: refs/heads/master b401a35fd -> 5411d3ecb
HBASE-17972 Remove mergePool from CompactSplitThread (Guangxu Cheng) Project: http://git-wip-us.apache.org/repos/asf/hbase/repo Commit: http://git-wip-us.apache.org/repos/asf/hbase/commit/5411d3ec Tree: http://git-wip-us.apache.org/repos/asf/hbase/tree/5411d3ec Diff: http://git-wip-us.apache.org/repos/asf/hbase/diff/5411d3ec Branch: refs/heads/master Commit: 5411d3ecb156a5128b9045bdb4e58850a10968fb Parents: b401a35 Author: tedyu <yuzhih...@gmail.com> Authored: Fri Apr 28 06:52:10 2017 -0700 Committer: tedyu <yuzhih...@gmail.com> Committed: Fri Apr 28 06:52:10 2017 -0700 ---------------------------------------------------------------------- .../hbase/regionserver/CompactSplitThread.java | 52 +------------------- .../regionserver/TestCompactSplitThread.java | 6 --- 2 files changed, 1 insertion(+), 57 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/hbase/blob/5411d3ec/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/CompactSplitThread.java ---------------------------------------------------------------------- diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/CompactSplitThread.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/CompactSplitThread.java index eba984a..cddfccb 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/CompactSplitThread.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/CompactSplitThread.java @@ -72,10 +72,6 @@ public class CompactSplitThread implements CompactionRequestor, PropagatingConfi // Configuration key for split threads public final static String SPLIT_THREADS = "hbase.regionserver.thread.split"; public final static int SPLIT_THREADS_DEFAULT = 1; - - // Configuration keys for merge threads - public final static String MERGE_THREADS = "hbase.regionserver.thread.merge"; - public final static int MERGE_THREADS_DEFAULT = 1; public static final String REGION_SERVER_REGION_SPLIT_LIMIT = "hbase.regionserver.regionSplitLimit"; @@ -87,7 +83,6 @@ public class CompactSplitThread implements CompactionRequestor, PropagatingConfi private final ThreadPoolExecutor longCompactions; private final ThreadPoolExecutor shortCompactions; private final ThreadPoolExecutor splits; - private final ThreadPoolExecutor mergePool; private volatile ThroughputController compactionThroughputController; @@ -150,15 +145,6 @@ public class CompactSplitThread implements CompactionRequestor, PropagatingConfi return new Thread(r, name); } }); - int mergeThreads = conf.getInt(MERGE_THREADS, MERGE_THREADS_DEFAULT); - this.mergePool = (ThreadPoolExecutor) Executors.newFixedThreadPool( - mergeThreads, new ThreadFactory() { - @Override - public Thread newThread(Runnable r) { - String name = n + "-merges-" + System.currentTimeMillis(); - return new Thread(r, name); - } - }); // compaction throughput controller this.compactionThroughputController = @@ -170,8 +156,7 @@ public class CompactSplitThread implements CompactionRequestor, PropagatingConfi return "compaction_queue=(" + longCompactions.getQueue().size() + ":" + shortCompactions.getQueue().size() + ")" - + ", split_queue=" + splits.getQueue().size() - + ", merge_queue=" + mergePool.getQueue().size(); + + ", split_queue=" + splits.getQueue().size(); } public String dumpQueue() { @@ -205,15 +190,6 @@ public class CompactSplitThread implements CompactionRequestor, PropagatingConfi queueLists.append("\n"); } - queueLists.append("\n"); - queueLists.append(" Region Merge Queue:\n"); - lq = mergePool.getQueue(); - it = lq.iterator(); - while (it.hasNext()) { - queueLists.append(" " + it.next().toString()); - queueLists.append("\n"); - } - return queueLists.toString(); } @@ -372,7 +348,6 @@ public class CompactSplitThread implements CompactionRequestor, PropagatingConfi */ void interruptIfNecessary() { splits.shutdown(); - mergePool.shutdown(); longCompactions.shutdown(); shortCompactions.shutdown(); } @@ -394,7 +369,6 @@ public class CompactSplitThread implements CompactionRequestor, PropagatingConfi void join() { waitFor(splits, "Split Thread"); - waitFor(mergePool, "Merge Thread"); waitFor(longCompactions, "Large Compaction Thread"); waitFor(shortCompactions, "Small Compaction Thread"); } @@ -641,21 +615,6 @@ public class CompactSplitThread implements CompactionRequestor, PropagatingConfi } } - int mergeThreads = newConf.getInt(MERGE_THREADS, - MERGE_THREADS_DEFAULT); - if (this.mergePool.getCorePoolSize() != mergeThreads) { - LOG.info("Changing the value of " + MERGE_THREADS + - " from " + this.mergePool.getCorePoolSize() + " to " + - mergeThreads); - if(this.mergePool.getCorePoolSize() < mergeThreads) { - this.mergePool.setMaximumPoolSize(mergeThreads); - this.mergePool.setCorePoolSize(mergeThreads); - } else { - this.mergePool.setCorePoolSize(mergeThreads); - this.mergePool.setMaximumPoolSize(mergeThreads); - } - } - ThroughputController old = this.compactionThroughputController; if (old != null) { old.stop("configuration change"); @@ -680,10 +639,6 @@ public class CompactSplitThread implements CompactionRequestor, PropagatingConfi return this.splits.getCorePoolSize(); } - protected int getMergeThreadNum() { - return this.mergePool.getCorePoolSize(); - } - /** * {@inheritDoc} */ @@ -706,11 +661,6 @@ public class CompactSplitThread implements CompactionRequestor, PropagatingConfi } @VisibleForTesting - public long getCompletedMergeTaskCount() { - return mergePool.getCompletedTaskCount(); - } - - @VisibleForTesting /** * Shutdown the long compaction thread pool. * Should only be used in unit test to prevent long compaction thread pool from stealing job http://git-wip-us.apache.org/repos/asf/hbase/blob/5411d3ec/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestCompactSplitThread.java ---------------------------------------------------------------------- diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestCompactSplitThread.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestCompactSplitThread.java index c7b6c7c..f6dc8c0 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestCompactSplitThread.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestCompactSplitThread.java @@ -80,7 +80,6 @@ public class TestCompactSplitThread { conf.setInt(CompactSplitThread.LARGE_COMPACTION_THREADS, 3); conf.setInt(CompactSplitThread.SMALL_COMPACTION_THREADS, 4); conf.setInt(CompactSplitThread.SPLIT_THREADS, 5); - conf.setInt(CompactSplitThread.MERGE_THREADS, 6); } @After @@ -113,13 +112,11 @@ public class TestCompactSplitThread { assertEquals(3, regionServer.compactSplitThread.getLargeCompactionThreadNum()); assertEquals(4, regionServer.compactSplitThread.getSmallCompactionThreadNum()); assertEquals(5, regionServer.compactSplitThread.getSplitThreadNum()); - assertEquals(6, regionServer.compactSplitThread.getMergeThreadNum()); // change bigger configurations and do online update conf.setInt(CompactSplitThread.LARGE_COMPACTION_THREADS, 4); conf.setInt(CompactSplitThread.SMALL_COMPACTION_THREADS, 5); conf.setInt(CompactSplitThread.SPLIT_THREADS, 6); - conf.setInt(CompactSplitThread.MERGE_THREADS, 7); try { regionServer.compactSplitThread.onConfigurationChange(conf); } catch (IllegalArgumentException iae) { @@ -130,13 +127,11 @@ public class TestCompactSplitThread { assertEquals(4, regionServer.compactSplitThread.getLargeCompactionThreadNum()); assertEquals(5, regionServer.compactSplitThread.getSmallCompactionThreadNum()); assertEquals(6, regionServer.compactSplitThread.getSplitThreadNum()); - assertEquals(7, regionServer.compactSplitThread.getMergeThreadNum()); // change smaller configurations and do online update conf.setInt(CompactSplitThread.LARGE_COMPACTION_THREADS, 2); conf.setInt(CompactSplitThread.SMALL_COMPACTION_THREADS, 3); conf.setInt(CompactSplitThread.SPLIT_THREADS, 4); - conf.setInt(CompactSplitThread.MERGE_THREADS, 5); try { regionServer.compactSplitThread.onConfigurationChange(conf); } catch (IllegalArgumentException iae) { @@ -147,7 +142,6 @@ public class TestCompactSplitThread { assertEquals(2, regionServer.compactSplitThread.getLargeCompactionThreadNum()); assertEquals(3, regionServer.compactSplitThread.getSmallCompactionThreadNum()); assertEquals(4, regionServer.compactSplitThread.getSplitThreadNum()); - assertEquals(5, regionServer.compactSplitThread.getMergeThreadNum()); } finally { conn.close(); }