Repository: hbase
Updated Branches:
  refs/heads/master 7bfbb6a3c -> 0e147a9d6


HBASE-14954 IllegalArgumentException was thrown when doing online configuration 
change in CompactSplitThread (Victor Xu)


Project: http://git-wip-us.apache.org/repos/asf/hbase/repo
Commit: http://git-wip-us.apache.org/repos/asf/hbase/commit/0e147a9d
Tree: http://git-wip-us.apache.org/repos/asf/hbase/tree/0e147a9d
Diff: http://git-wip-us.apache.org/repos/asf/hbase/diff/0e147a9d

Branch: refs/heads/master
Commit: 0e147a9d6e53e71ad2e57f512b4d3e1eeeac0b78
Parents: 7bfbb6a
Author: tedyu <yuzhih...@gmail.com>
Authored: Wed Dec 9 07:18:08 2015 -0800
Committer: tedyu <yuzhih...@gmail.com>
Committed: Wed Dec 9 07:18:08 2015 -0800

----------------------------------------------------------------------
 .../hbase/regionserver/CompactSplitThread.java  |  46 ++++++--
 .../regionserver/TestCompactSplitThread.java    | 104 +++++++++++++++++++
 2 files changed, 141 insertions(+), 9 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hbase/blob/0e147a9d/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/CompactSplitThread.java
----------------------------------------------------------------------
diff --git 
a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/CompactSplitThread.java
 
b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/CompactSplitThread.java
index 6ce90bc..f54f008 100644
--- 
a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/CompactSplitThread.java
+++ 
b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/CompactSplitThread.java
@@ -618,8 +618,13 @@ public class CompactSplitThread implements 
CompactionRequestor, PropagatingConfi
       LOG.info("Changing the value of " + LARGE_COMPACTION_THREADS +
               " from " + this.longCompactions.getCorePoolSize() + " to " +
               largeThreads);
-      this.longCompactions.setMaximumPoolSize(largeThreads);
-      this.longCompactions.setCorePoolSize(largeThreads);
+      if(this.longCompactions.getCorePoolSize() < largeThreads) {
+        this.longCompactions.setMaximumPoolSize(largeThreads);
+        this.longCompactions.setCorePoolSize(largeThreads);
+      } else {
+        this.longCompactions.setCorePoolSize(largeThreads);
+        this.longCompactions.setMaximumPoolSize(largeThreads);
+      }
     }
 
     int smallThreads = newConf.getInt(SMALL_COMPACTION_THREADS,
@@ -628,8 +633,13 @@ public class CompactSplitThread implements 
CompactionRequestor, PropagatingConfi
       LOG.info("Changing the value of " + SMALL_COMPACTION_THREADS +
                 " from " + this.shortCompactions.getCorePoolSize() + " to " +
                 smallThreads);
-      this.shortCompactions.setMaximumPoolSize(smallThreads);
-      this.shortCompactions.setCorePoolSize(smallThreads);
+      if(this.shortCompactions.getCorePoolSize() < smallThreads) {
+        this.shortCompactions.setMaximumPoolSize(smallThreads);
+        this.shortCompactions.setCorePoolSize(smallThreads);
+      } else {
+        this.shortCompactions.setCorePoolSize(smallThreads);
+        this.shortCompactions.setMaximumPoolSize(smallThreads);
+      }
     }
 
     int splitThreads = newConf.getInt(SPLIT_THREADS,
@@ -638,8 +648,13 @@ public class CompactSplitThread implements 
CompactionRequestor, PropagatingConfi
       LOG.info("Changing the value of " + SPLIT_THREADS +
                 " from " + this.splits.getCorePoolSize() + " to " +
                 splitThreads);
-      this.splits.setMaximumPoolSize(smallThreads);
-      this.splits.setCorePoolSize(smallThreads);
+      if(this.splits.getCorePoolSize() < splitThreads) {
+        this.splits.setMaximumPoolSize(splitThreads);
+        this.splits.setCorePoolSize(splitThreads);
+      } else {
+        this.splits.setCorePoolSize(splitThreads);
+        this.splits.setMaximumPoolSize(splitThreads);
+      }
     }
 
     int mergeThreads = newConf.getInt(MERGE_THREADS,
@@ -648,8 +663,13 @@ public class CompactSplitThread implements 
CompactionRequestor, PropagatingConfi
       LOG.info("Changing the value of " + MERGE_THREADS +
                 " from " + this.mergePool.getCorePoolSize() + " to " +
                 mergeThreads);
-      this.mergePool.setMaximumPoolSize(smallThreads);
-      this.mergePool.setCorePoolSize(smallThreads);
+      if(this.mergePool.getCorePoolSize() < mergeThreads) {
+        this.mergePool.setMaximumPoolSize(mergeThreads);
+        this.mergePool.setCorePoolSize(mergeThreads);
+      } else {
+        this.mergePool.setCorePoolSize(mergeThreads);
+        this.mergePool.setMaximumPoolSize(mergeThreads);
+      }
     }
 
     CompactionThroughputController old = this.compactionThroughputController;
@@ -668,10 +688,18 @@ public class CompactSplitThread implements 
CompactionRequestor, PropagatingConfi
     return this.shortCompactions.getCorePoolSize();
   }
 
-  public int getLargeCompactionThreadNum() {
+  protected int getLargeCompactionThreadNum() {
     return this.longCompactions.getCorePoolSize();
   }
 
+  protected int getSplitThreadNum() {
+    return this.splits.getCorePoolSize();
+  }
+
+  protected int getMergeThreadNum() {
+    return this.mergePool.getCorePoolSize();
+  }
+
   /**
    * {@inheritDoc}
    */

http://git-wip-us.apache.org/repos/asf/hbase/blob/0e147a9d/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestCompactSplitThread.java
----------------------------------------------------------------------
diff --git 
a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestCompactSplitThread.java
 
b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestCompactSplitThread.java
new file mode 100644
index 0000000..022279a
--- /dev/null
+++ 
b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestCompactSplitThread.java
@@ -0,0 +1,104 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.regionserver;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hbase.*;
+import org.apache.hadoop.hbase.client.*;
+import 
org.apache.hadoop.hbase.regionserver.compactions.CompactionThroughputControllerFactory;
+import 
org.apache.hadoop.hbase.regionserver.compactions.NoLimitCompactionThroughputController;
+import org.apache.hadoop.hbase.testclassification.MediumTests;
+import org.apache.hadoop.hbase.util.Bytes;
+import org.junit.Assert;
+import org.junit.Test;
+import org.junit.experimental.categories.Category;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
+
+@Category(MediumTests.class)
+public class TestCompactSplitThread {
+  private static final Log LOG = 
LogFactory.getLog(TestCompactSplitThread.class);
+  private static final HBaseTestingUtility TEST_UTIL = new 
HBaseTestingUtility();
+  private final TableName tableName = 
TableName.valueOf(getClass().getSimpleName());
+  private final byte[] family = Bytes.toBytes("f");
+
+  @Test
+  public void testThreadPoolSizeTuning() throws Exception {
+    Configuration conf = TEST_UTIL.getConfiguration();
+    conf.setInt(CompactSplitThread.LARGE_COMPACTION_THREADS, 3);
+    conf.setInt(CompactSplitThread.SMALL_COMPACTION_THREADS, 4);
+    conf.setInt(CompactSplitThread.SPLIT_THREADS, 5);
+    conf.setInt(CompactSplitThread.MERGE_THREADS, 6);
+    TEST_UTIL.startMiniCluster(1);
+    Connection conn = ConnectionFactory.createConnection(conf);
+    try {
+      HTableDescriptor htd = new HTableDescriptor(tableName);
+      htd.addFamily(new HColumnDescriptor(family));
+      htd.setCompactionEnabled(false);
+      TEST_UTIL.getHBaseAdmin().createTable(htd);
+      TEST_UTIL.waitTableAvailable(tableName);
+      HRegionServer regionServer = 
TEST_UTIL.getRSForFirstRegionInTable(tableName);
+
+      // check initial configuration of thread pool sizes
+      assertEquals(3, 
regionServer.compactSplitThread.getLargeCompactionThreadNum());
+      assertEquals(4, 
regionServer.compactSplitThread.getSmallCompactionThreadNum());
+      assertEquals(5, regionServer.compactSplitThread.getSplitThreadNum());
+      assertEquals(6, regionServer.compactSplitThread.getMergeThreadNum());
+
+      // change bigger configurations and do online update
+      conf.setInt(CompactSplitThread.LARGE_COMPACTION_THREADS, 4);
+      conf.setInt(CompactSplitThread.SMALL_COMPACTION_THREADS, 5);
+      conf.setInt(CompactSplitThread.SPLIT_THREADS, 6);
+      conf.setInt(CompactSplitThread.MERGE_THREADS, 7);
+      try {
+        regionServer.compactSplitThread.onConfigurationChange(conf);
+      } catch (IllegalArgumentException iae) {
+        Assert.fail("Update bigger configuration failed!");
+      }
+
+      // check again after online update
+      assertEquals(4, 
regionServer.compactSplitThread.getLargeCompactionThreadNum());
+      assertEquals(5, 
regionServer.compactSplitThread.getSmallCompactionThreadNum());
+      assertEquals(6, regionServer.compactSplitThread.getSplitThreadNum());
+      assertEquals(7, regionServer.compactSplitThread.getMergeThreadNum());
+
+      // change smaller configurations and do online update
+      conf.setInt(CompactSplitThread.LARGE_COMPACTION_THREADS, 2);
+      conf.setInt(CompactSplitThread.SMALL_COMPACTION_THREADS, 3);
+      conf.setInt(CompactSplitThread.SPLIT_THREADS, 4);
+      conf.setInt(CompactSplitThread.MERGE_THREADS, 5);
+      try {
+        regionServer.compactSplitThread.onConfigurationChange(conf);
+      } catch (IllegalArgumentException iae) {
+        Assert.fail("Update smaller configuration failed!");
+      }
+
+      // check again after online update
+      assertEquals(2, 
regionServer.compactSplitThread.getLargeCompactionThreadNum());
+      assertEquals(3, 
regionServer.compactSplitThread.getSmallCompactionThreadNum());
+      assertEquals(4, regionServer.compactSplitThread.getSplitThreadNum());
+      assertEquals(5, regionServer.compactSplitThread.getMergeThreadNum());
+    } finally {
+      conn.close();
+      TEST_UTIL.shutdownMiniCluster();
+    }
+  }
+}

Reply via email to