HBASE-14922 Delayed flush doesn't work causing flush storms.

Project: http://git-wip-us.apache.org/repos/asf/hbase/repo
Commit: http://git-wip-us.apache.org/repos/asf/hbase/commit/cd5ddc5f
Tree: http://git-wip-us.apache.org/repos/asf/hbase/tree/cd5ddc5f
Diff: http://git-wip-us.apache.org/repos/asf/hbase/diff/cd5ddc5f

Branch: refs/heads/hbase-12439
Commit: cd5ddc5fece7147d55d74117abb272cf8ea08c4d
Parents: c6b8e6f
Author: Elliott Clark <ecl...@apache.org>
Authored: Thu Dec 3 10:40:40 2015 -0800
Committer: Elliott Clark <ecl...@apache.org>
Committed: Fri Dec 4 16:54:01 2015 -0800

----------------------------------------------------------------------
 .../org/apache/hadoop/hbase/ChoreService.java   |  31 ++++-
 .../JitterScheduledThreadPoolExecutorImpl.java  | 123 +++++++++++++++++++
 .../apache/hadoop/hbase/TestChoreService.java   |  10 +-
 .../hbase/regionserver/HRegionServer.java       |   6 +-
 4 files changed, 157 insertions(+), 13 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hbase/blob/cd5ddc5f/hbase-common/src/main/java/org/apache/hadoop/hbase/ChoreService.java
----------------------------------------------------------------------
diff --git 
a/hbase-common/src/main/java/org/apache/hadoop/hbase/ChoreService.java 
b/hbase-common/src/main/java/org/apache/hadoop/hbase/ChoreService.java
index 2519f8f..091d854 100644
--- a/hbase-common/src/main/java/org/apache/hadoop/hbase/ChoreService.java
+++ b/hbase-common/src/main/java/org/apache/hadoop/hbase/ChoreService.java
@@ -27,6 +27,7 @@ import java.util.concurrent.ScheduledThreadPoolExecutor;
 import java.util.concurrent.ThreadFactory;
 import java.util.concurrent.atomic.AtomicInteger;
 
+import com.google.common.annotations.VisibleForTesting;
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.hbase.ScheduledChore.ChoreServicer;
@@ -87,11 +88,21 @@ public class ChoreService implements ChoreServicer {
   private final String coreThreadPoolPrefix;
 
   /**
+   *
    * @param coreThreadPoolPrefix Prefix that will be applied to the Thread 
name of all threads
    *          spawned by this service
    */
+  @VisibleForTesting
   public ChoreService(final String coreThreadPoolPrefix) {
-    this(coreThreadPoolPrefix, MIN_CORE_POOL_SIZE);
+    this(coreThreadPoolPrefix, MIN_CORE_POOL_SIZE, false);
+  }
+
+  /**
+   * @param jitter Should chore service add some jitter for all of the 
scheduled chores. When set
+   *               to true this will add -10% to 10% jitter.
+   */
+  public ChoreService(final String coreThreadPoolPrefix, final boolean jitter) 
{
+    this(coreThreadPoolPrefix, MIN_CORE_POOL_SIZE, jitter);
   }
 
   /**
@@ -101,11 +112,19 @@ public class ChoreService implements ChoreServicer {
    *          to during initialization. The default size is 1, but specifying 
a larger size may be
    *          beneficial if you know that 1 thread will not be enough.
    */
-  public ChoreService(final String coreThreadPoolPrefix, int corePoolSize) {
+  public ChoreService(final String coreThreadPoolPrefix, int corePoolSize, 
boolean jitter) {
     this.coreThreadPoolPrefix = coreThreadPoolPrefix;
-    if (corePoolSize < MIN_CORE_POOL_SIZE) corePoolSize = MIN_CORE_POOL_SIZE;
+    if (corePoolSize < MIN_CORE_POOL_SIZE)  {
+      corePoolSize = MIN_CORE_POOL_SIZE;
+    }
+
     final ThreadFactory threadFactory = new 
ChoreServiceThreadFactory(coreThreadPoolPrefix);
-    scheduler = new ScheduledThreadPoolExecutor(corePoolSize, threadFactory);
+    if (jitter) {
+      scheduler = new JitterScheduledThreadPoolExecutorImpl(corePoolSize, 
threadFactory, 0.1);
+    } else {
+      scheduler = new ScheduledThreadPoolExecutor(corePoolSize, threadFactory);
+    }
+
     scheduler.setRemoveOnCancelPolicy(true);
     scheduledChores = new HashMap<ScheduledChore, ScheduledFuture<?>>();
     choresMissingStartTime = new HashMap<ScheduledChore, Boolean>();
@@ -127,7 +146,9 @@ public class ChoreService implements ChoreServicer {
    *         (typically occurs when a chore is scheduled during shutdown of 
service)
    */
   public synchronized boolean scheduleChore(ScheduledChore chore) {
-    if (chore == null) return false;
+    if (chore == null) {
+      return false;
+    }
 
     try {
       chore.setChoreServicer(this);

http://git-wip-us.apache.org/repos/asf/hbase/blob/cd5ddc5f/hbase-common/src/main/java/org/apache/hadoop/hbase/JitterScheduledThreadPoolExecutorImpl.java
----------------------------------------------------------------------
diff --git 
a/hbase-common/src/main/java/org/apache/hadoop/hbase/JitterScheduledThreadPoolExecutorImpl.java
 
b/hbase-common/src/main/java/org/apache/hadoop/hbase/JitterScheduledThreadPoolExecutorImpl.java
new file mode 100644
index 0000000..95efa5a
--- /dev/null
+++ 
b/hbase-common/src/main/java/org/apache/hadoop/hbase/JitterScheduledThreadPoolExecutorImpl.java
@@ -0,0 +1,123 @@
+/**
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hbase;
+
+import java.util.concurrent.Callable;
+import java.util.concurrent.Delayed;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.RunnableScheduledFuture;
+import java.util.concurrent.ScheduledThreadPoolExecutor;
+import java.util.concurrent.ThreadFactory;
+import java.util.concurrent.ThreadLocalRandom;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
+
+/**
+ * ScheduledThreadPoolExecutor that will add some jitter to the 
RunnableScheduledFuture.getDelay.
+ *
+ * This will spread out things on a distributed cluster.
+ */
+public class JitterScheduledThreadPoolExecutorImpl extends 
ScheduledThreadPoolExecutor {
+  private final double spread;
+
+  /**
+   * Main constructor.
+   * @param spread The percent up and down that 
RunnableScheduledFuture.getDelay should be jittered.
+   */
+  public JitterScheduledThreadPoolExecutorImpl(int corePoolSize,
+                                               ThreadFactory threadFactory,
+                                               double spread) {
+    super(corePoolSize, threadFactory);
+    this.spread = spread;
+  }
+
+  protected <V> java.util.concurrent.RunnableScheduledFuture<V> decorateTask(
+      Runnable runnable, java.util.concurrent.RunnableScheduledFuture<V> task) 
{
+    return new JitteredRunnableScheduledFuture<>(task);
+  }
+
+
+  protected <V> java.util.concurrent.RunnableScheduledFuture<V> decorateTask(
+      Callable<V> callable, java.util.concurrent.RunnableScheduledFuture<V> 
task) {
+    return new JitteredRunnableScheduledFuture<>(task);
+  }
+
+  /**
+   * Class that basically just defers to the wrapped future.
+   * The only exception is getDelay
+   */
+  protected class JitteredRunnableScheduledFuture<V> implements 
RunnableScheduledFuture<V> {
+    private final RunnableScheduledFuture<V> wrapped;
+    JitteredRunnableScheduledFuture(RunnableScheduledFuture<V> wrapped) {
+      this.wrapped = wrapped;
+    }
+
+    @Override
+    public boolean isPeriodic() {
+      return wrapped.isPeriodic();
+    }
+
+    @Override
+    public long getDelay(TimeUnit unit) {
+      long baseDelay = wrapped.getDelay(unit);
+      long spreadTime = (long) (baseDelay * spread);
+      long delay = baseDelay + 
ThreadLocalRandom.current().nextLong(-spreadTime, spreadTime);
+      // Ensure that we don't roll over for nanoseconds.
+      return (delay < 0) ? baseDelay : delay;
+    }
+
+    @Override
+    public int compareTo(Delayed o) {
+      return wrapped.compareTo(o);
+    }
+
+    @Override
+    public void run() {
+      wrapped.run();
+    }
+
+    @Override
+    public boolean cancel(boolean mayInterruptIfRunning) {
+      return wrapped.cancel(mayInterruptIfRunning);
+    }
+
+    @Override
+    public boolean isCancelled() {
+      return wrapped.isCancelled();
+    }
+
+    @Override
+    public boolean isDone() {
+      return wrapped.isDone();
+    }
+
+    @Override
+    public V get() throws InterruptedException, ExecutionException {
+      return wrapped.get();
+    }
+
+    @Override
+    public V get(long timeout,
+                 TimeUnit unit) throws InterruptedException, 
ExecutionException, TimeoutException {
+      return wrapped.get(timeout, unit);
+    }
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/hbase/blob/cd5ddc5f/hbase-common/src/test/java/org/apache/hadoop/hbase/TestChoreService.java
----------------------------------------------------------------------
diff --git 
a/hbase-common/src/test/java/org/apache/hadoop/hbase/TestChoreService.java 
b/hbase-common/src/test/java/org/apache/hadoop/hbase/TestChoreService.java
index b113174..d1a6c19 100644
--- a/hbase-common/src/test/java/org/apache/hadoop/hbase/TestChoreService.java
+++ b/hbase-common/src/test/java/org/apache/hadoop/hbase/TestChoreService.java
@@ -315,7 +315,7 @@ public class TestChoreService {
     final int corePoolSize = 10;
     final int defaultCorePoolSize = ChoreService.MIN_CORE_POOL_SIZE;
 
-    ChoreService customInit = new 
ChoreService("testChoreServiceConstruction_custom", corePoolSize);
+    ChoreService customInit = new 
ChoreService("testChoreServiceConstruction_custom", corePoolSize, false);
     try {
       assertEquals(corePoolSize, customInit.getCorePoolSize());
     } finally {
@@ -329,11 +329,11 @@ public class TestChoreService {
       shutdownService(defaultInit);
     }
 
-    ChoreService invalidInit = new 
ChoreService("testChoreServiceConstruction_invalid", -10);
+    ChoreService invalidInit = new 
ChoreService("testChoreServiceConstruction_invalid", -10, false);
     try {
       assertEquals(defaultCorePoolSize, invalidInit.getCorePoolSize());
     } finally {
-    shutdownService(invalidInit);
+      shutdownService(invalidInit);
     }
   }
 
@@ -403,7 +403,7 @@ public class TestChoreService {
   @Test (timeout=20000)
   public void testCorePoolIncrease() throws InterruptedException {
     final int initialCorePoolSize = 3;
-    ChoreService service = new ChoreService("testCorePoolIncrease", 
initialCorePoolSize);
+    ChoreService service = new ChoreService("testCorePoolIncrease", 
initialCorePoolSize, false);
 
     try {
       assertEquals("Should have a core pool of size: " + initialCorePoolSize, 
initialCorePoolSize,
@@ -443,7 +443,7 @@ public class TestChoreService {
   @Test(timeout = 30000)
   public void testCorePoolDecrease() throws InterruptedException {
     final int initialCorePoolSize = 3;
-    ChoreService service = new ChoreService("testCorePoolDecrease", 
initialCorePoolSize);
+    ChoreService service = new ChoreService("testCorePoolDecrease", 
initialCorePoolSize, false);
     final int chorePeriod = 100;
     try {
       // Slow chores always miss their start time and thus the core pool size 
should be at least as

http://git-wip-us.apache.org/repos/asf/hbase/blob/cd5ddc5f/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java
----------------------------------------------------------------------
diff --git 
a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java
 
b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java
index 2ce2193..211fed5 100644
--- 
a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java
+++ 
b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java
@@ -605,7 +605,7 @@ public class HRegionServer extends HasThread implements
     rpcServices.start();
     putUpWebUI();
     this.walRoller = new LogRoller(this, this);
-    this.choreService = new ChoreService(getServerName().toString());
+    this.choreService = new ChoreService(getServerName().toString(), true);
 
     if (!SystemUtils.IS_OS_WINDOWS) {
       Signal.handle(new Signal("HUP"), new SignalHandler() {
@@ -1574,8 +1574,8 @@ public class HRegionServer extends HasThread implements
 
   static class PeriodicMemstoreFlusher extends ScheduledChore {
     final HRegionServer server;
-    final static int RANGE_OF_DELAY = 20000; //millisec
-    final static int MIN_DELAY_TIME = 3000; //millisec
+    final static int RANGE_OF_DELAY = 5 * 60 * 1000; // 5 min in milliseconds
+    final static int MIN_DELAY_TIME = 0; // millisec
     public PeriodicMemstoreFlusher(int cacheFlushInterval, final HRegionServer 
server) {
       super(server.getServerName() + "-MemstoreFlusherChore", server, 
cacheFlushInterval);
       this.server = server;

Reply via email to