Repository: hbase
Updated Branches:
  refs/heads/master b401a35fd -> 5411d3ecb


HBASE-17972 Remove mergePool from CompactSplitThread (Guangxu Cheng)


Project: http://git-wip-us.apache.org/repos/asf/hbase/repo
Commit: http://git-wip-us.apache.org/repos/asf/hbase/commit/5411d3ec
Tree: http://git-wip-us.apache.org/repos/asf/hbase/tree/5411d3ec
Diff: http://git-wip-us.apache.org/repos/asf/hbase/diff/5411d3ec

Branch: refs/heads/master
Commit: 5411d3ecb156a5128b9045bdb4e58850a10968fb
Parents: b401a35
Author: tedyu <yuzhih...@gmail.com>
Authored: Fri Apr 28 06:52:10 2017 -0700
Committer: tedyu <yuzhih...@gmail.com>
Committed: Fri Apr 28 06:52:10 2017 -0700

----------------------------------------------------------------------
 .../hbase/regionserver/CompactSplitThread.java  | 52 +-------------------
 .../regionserver/TestCompactSplitThread.java    |  6 ---
 2 files changed, 1 insertion(+), 57 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hbase/blob/5411d3ec/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/CompactSplitThread.java
----------------------------------------------------------------------
diff --git 
a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/CompactSplitThread.java
 
b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/CompactSplitThread.java
index eba984a..cddfccb 100644
--- 
a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/CompactSplitThread.java
+++ 
b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/CompactSplitThread.java
@@ -72,10 +72,6 @@ public class CompactSplitThread implements 
CompactionRequestor, PropagatingConfi
   // Configuration key for split threads
   public final static String SPLIT_THREADS = "hbase.regionserver.thread.split";
   public final static int SPLIT_THREADS_DEFAULT = 1;
-  
-  // Configuration keys for merge threads
-  public final static String MERGE_THREADS = "hbase.regionserver.thread.merge";
-  public final static int MERGE_THREADS_DEFAULT = 1;
 
   public static final String REGION_SERVER_REGION_SPLIT_LIMIT =
       "hbase.regionserver.regionSplitLimit";
@@ -87,7 +83,6 @@ public class CompactSplitThread implements 
CompactionRequestor, PropagatingConfi
   private final ThreadPoolExecutor longCompactions;
   private final ThreadPoolExecutor shortCompactions;
   private final ThreadPoolExecutor splits;
-  private final ThreadPoolExecutor mergePool;
 
   private volatile ThroughputController compactionThroughputController;
 
@@ -150,15 +145,6 @@ public class CompactSplitThread implements 
CompactionRequestor, PropagatingConfi
             return new Thread(r, name);
           }
       });
-    int mergeThreads = conf.getInt(MERGE_THREADS, MERGE_THREADS_DEFAULT);
-    this.mergePool = (ThreadPoolExecutor) Executors.newFixedThreadPool(
-        mergeThreads, new ThreadFactory() {
-          @Override
-          public Thread newThread(Runnable r) {
-            String name = n + "-merges-" + System.currentTimeMillis();
-            return new Thread(r, name);
-          }
-        });
 
     // compaction throughput controller
     this.compactionThroughputController =
@@ -170,8 +156,7 @@ public class CompactSplitThread implements 
CompactionRequestor, PropagatingConfi
     return "compaction_queue=("
         + longCompactions.getQueue().size() + ":"
         + shortCompactions.getQueue().size() + ")"
-        + ", split_queue=" + splits.getQueue().size()
-        + ", merge_queue=" + mergePool.getQueue().size();
+        + ", split_queue=" + splits.getQueue().size();
   }
   
   public String dumpQueue() {
@@ -205,15 +190,6 @@ public class CompactSplitThread implements 
CompactionRequestor, PropagatingConfi
       queueLists.append("\n");
     }
 
-    queueLists.append("\n");
-    queueLists.append("  Region Merge Queue:\n");
-    lq = mergePool.getQueue();
-    it = lq.iterator();
-    while (it.hasNext()) {
-      queueLists.append("    " + it.next().toString());
-      queueLists.append("\n");
-    }
-
     return queueLists.toString();
   }
 
@@ -372,7 +348,6 @@ public class CompactSplitThread implements 
CompactionRequestor, PropagatingConfi
    */
   void interruptIfNecessary() {
     splits.shutdown();
-    mergePool.shutdown();
     longCompactions.shutdown();
     shortCompactions.shutdown();
   }
@@ -394,7 +369,6 @@ public class CompactSplitThread implements 
CompactionRequestor, PropagatingConfi
 
   void join() {
     waitFor(splits, "Split Thread");
-    waitFor(mergePool, "Merge Thread");
     waitFor(longCompactions, "Large Compaction Thread");
     waitFor(shortCompactions, "Small Compaction Thread");
   }
@@ -641,21 +615,6 @@ public class CompactSplitThread implements 
CompactionRequestor, PropagatingConfi
       }
     }
 
-    int mergeThreads = newConf.getInt(MERGE_THREADS,
-            MERGE_THREADS_DEFAULT);
-    if (this.mergePool.getCorePoolSize() != mergeThreads) {
-      LOG.info("Changing the value of " + MERGE_THREADS +
-                " from " + this.mergePool.getCorePoolSize() + " to " +
-                mergeThreads);
-      if(this.mergePool.getCorePoolSize() < mergeThreads) {
-        this.mergePool.setMaximumPoolSize(mergeThreads);
-        this.mergePool.setCorePoolSize(mergeThreads);
-      } else {
-        this.mergePool.setCorePoolSize(mergeThreads);
-        this.mergePool.setMaximumPoolSize(mergeThreads);
-      }
-    }
-
     ThroughputController old = this.compactionThroughputController;
     if (old != null) {
       old.stop("configuration change");
@@ -680,10 +639,6 @@ public class CompactSplitThread implements 
CompactionRequestor, PropagatingConfi
     return this.splits.getCorePoolSize();
   }
 
-  protected int getMergeThreadNum() {
-    return this.mergePool.getCorePoolSize();
-  }
-
   /**
    * {@inheritDoc}
    */
@@ -706,11 +661,6 @@ public class CompactSplitThread implements 
CompactionRequestor, PropagatingConfi
   }
 
   @VisibleForTesting
-  public long getCompletedMergeTaskCount() {
-    return mergePool.getCompletedTaskCount();
-  }
-
-  @VisibleForTesting
   /**
    * Shutdown the long compaction thread pool.
    * Should only be used in unit test to prevent long compaction thread pool 
from stealing job

http://git-wip-us.apache.org/repos/asf/hbase/blob/5411d3ec/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestCompactSplitThread.java
----------------------------------------------------------------------
diff --git 
a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestCompactSplitThread.java
 
b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestCompactSplitThread.java
index c7b6c7c..f6dc8c0 100644
--- 
a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestCompactSplitThread.java
+++ 
b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestCompactSplitThread.java
@@ -80,7 +80,6 @@ public class TestCompactSplitThread {
     conf.setInt(CompactSplitThread.LARGE_COMPACTION_THREADS, 3);
     conf.setInt(CompactSplitThread.SMALL_COMPACTION_THREADS, 4);
     conf.setInt(CompactSplitThread.SPLIT_THREADS, 5);
-    conf.setInt(CompactSplitThread.MERGE_THREADS, 6);
   }
 
   @After
@@ -113,13 +112,11 @@ public class TestCompactSplitThread {
       assertEquals(3, 
regionServer.compactSplitThread.getLargeCompactionThreadNum());
       assertEquals(4, 
regionServer.compactSplitThread.getSmallCompactionThreadNum());
       assertEquals(5, regionServer.compactSplitThread.getSplitThreadNum());
-      assertEquals(6, regionServer.compactSplitThread.getMergeThreadNum());
 
       // change bigger configurations and do online update
       conf.setInt(CompactSplitThread.LARGE_COMPACTION_THREADS, 4);
       conf.setInt(CompactSplitThread.SMALL_COMPACTION_THREADS, 5);
       conf.setInt(CompactSplitThread.SPLIT_THREADS, 6);
-      conf.setInt(CompactSplitThread.MERGE_THREADS, 7);
       try {
         regionServer.compactSplitThread.onConfigurationChange(conf);
       } catch (IllegalArgumentException iae) {
@@ -130,13 +127,11 @@ public class TestCompactSplitThread {
       assertEquals(4, 
regionServer.compactSplitThread.getLargeCompactionThreadNum());
       assertEquals(5, 
regionServer.compactSplitThread.getSmallCompactionThreadNum());
       assertEquals(6, regionServer.compactSplitThread.getSplitThreadNum());
-      assertEquals(7, regionServer.compactSplitThread.getMergeThreadNum());
 
       // change smaller configurations and do online update
       conf.setInt(CompactSplitThread.LARGE_COMPACTION_THREADS, 2);
       conf.setInt(CompactSplitThread.SMALL_COMPACTION_THREADS, 3);
       conf.setInt(CompactSplitThread.SPLIT_THREADS, 4);
-      conf.setInt(CompactSplitThread.MERGE_THREADS, 5);
       try {
         regionServer.compactSplitThread.onConfigurationChange(conf);
       } catch (IllegalArgumentException iae) {
@@ -147,7 +142,6 @@ public class TestCompactSplitThread {
       assertEquals(2, 
regionServer.compactSplitThread.getLargeCompactionThreadNum());
       assertEquals(3, 
regionServer.compactSplitThread.getSmallCompactionThreadNum());
       assertEquals(4, regionServer.compactSplitThread.getSplitThreadNum());
-      assertEquals(5, regionServer.compactSplitThread.getMergeThreadNum());
     } finally {
       conn.close();
     }

Reply via email to