HBASE-14922 Delayed flush doesn't work causing flush storms.
Project: http://git-wip-us.apache.org/repos/asf/hbase/repo Commit: http://git-wip-us.apache.org/repos/asf/hbase/commit/cd5ddc5f Tree: http://git-wip-us.apache.org/repos/asf/hbase/tree/cd5ddc5f Diff: http://git-wip-us.apache.org/repos/asf/hbase/diff/cd5ddc5f Branch: refs/heads/hbase-12439 Commit: cd5ddc5fece7147d55d74117abb272cf8ea08c4d Parents: c6b8e6f Author: Elliott Clark <ecl...@apache.org> Authored: Thu Dec 3 10:40:40 2015 -0800 Committer: Elliott Clark <ecl...@apache.org> Committed: Fri Dec 4 16:54:01 2015 -0800 ---------------------------------------------------------------------- .../org/apache/hadoop/hbase/ChoreService.java | 31 ++++- .../JitterScheduledThreadPoolExecutorImpl.java | 123 +++++++++++++++++++ .../apache/hadoop/hbase/TestChoreService.java | 10 +- .../hbase/regionserver/HRegionServer.java | 6 +- 4 files changed, 157 insertions(+), 13 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/hbase/blob/cd5ddc5f/hbase-common/src/main/java/org/apache/hadoop/hbase/ChoreService.java ---------------------------------------------------------------------- diff --git a/hbase-common/src/main/java/org/apache/hadoop/hbase/ChoreService.java b/hbase-common/src/main/java/org/apache/hadoop/hbase/ChoreService.java index 2519f8f..091d854 100644 --- a/hbase-common/src/main/java/org/apache/hadoop/hbase/ChoreService.java +++ b/hbase-common/src/main/java/org/apache/hadoop/hbase/ChoreService.java @@ -27,6 +27,7 @@ import java.util.concurrent.ScheduledThreadPoolExecutor; import java.util.concurrent.ThreadFactory; import java.util.concurrent.atomic.AtomicInteger; +import com.google.common.annotations.VisibleForTesting; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.apache.hadoop.hbase.ScheduledChore.ChoreServicer; @@ -87,11 +88,21 @@ public class ChoreService implements ChoreServicer { private final String coreThreadPoolPrefix; /** + * * @param coreThreadPoolPrefix Prefix that will be applied to the Thread name of all threads * spawned by this service */ + @VisibleForTesting public ChoreService(final String coreThreadPoolPrefix) { - this(coreThreadPoolPrefix, MIN_CORE_POOL_SIZE); + this(coreThreadPoolPrefix, MIN_CORE_POOL_SIZE, false); + } + + /** + * @param jitter Should chore service add some jitter for all of the scheduled chores. When set + * to true this will add -10% to 10% jitter. + */ + public ChoreService(final String coreThreadPoolPrefix, final boolean jitter) { + this(coreThreadPoolPrefix, MIN_CORE_POOL_SIZE, jitter); } /** @@ -101,11 +112,19 @@ public class ChoreService implements ChoreServicer { * to during initialization. The default size is 1, but specifying a larger size may be * beneficial if you know that 1 thread will not be enough. */ - public ChoreService(final String coreThreadPoolPrefix, int corePoolSize) { + public ChoreService(final String coreThreadPoolPrefix, int corePoolSize, boolean jitter) { this.coreThreadPoolPrefix = coreThreadPoolPrefix; - if (corePoolSize < MIN_CORE_POOL_SIZE) corePoolSize = MIN_CORE_POOL_SIZE; + if (corePoolSize < MIN_CORE_POOL_SIZE) { + corePoolSize = MIN_CORE_POOL_SIZE; + } + final ThreadFactory threadFactory = new ChoreServiceThreadFactory(coreThreadPoolPrefix); - scheduler = new ScheduledThreadPoolExecutor(corePoolSize, threadFactory); + if (jitter) { + scheduler = new JitterScheduledThreadPoolExecutorImpl(corePoolSize, threadFactory, 0.1); + } else { + scheduler = new ScheduledThreadPoolExecutor(corePoolSize, threadFactory); + } + scheduler.setRemoveOnCancelPolicy(true); scheduledChores = new HashMap<ScheduledChore, ScheduledFuture<?>>(); choresMissingStartTime = new HashMap<ScheduledChore, Boolean>(); @@ -127,7 +146,9 @@ public class ChoreService implements ChoreServicer { * (typically occurs when a chore is scheduled during shutdown of service) */ public synchronized boolean scheduleChore(ScheduledChore chore) { - if (chore == null) return false; + if (chore == null) { + return false; + } try { chore.setChoreServicer(this); http://git-wip-us.apache.org/repos/asf/hbase/blob/cd5ddc5f/hbase-common/src/main/java/org/apache/hadoop/hbase/JitterScheduledThreadPoolExecutorImpl.java ---------------------------------------------------------------------- diff --git a/hbase-common/src/main/java/org/apache/hadoop/hbase/JitterScheduledThreadPoolExecutorImpl.java b/hbase-common/src/main/java/org/apache/hadoop/hbase/JitterScheduledThreadPoolExecutorImpl.java new file mode 100644 index 0000000..95efa5a --- /dev/null +++ b/hbase-common/src/main/java/org/apache/hadoop/hbase/JitterScheduledThreadPoolExecutorImpl.java @@ -0,0 +1,123 @@ +/** + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.hbase; + +import java.util.concurrent.Callable; +import java.util.concurrent.Delayed; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.RunnableScheduledFuture; +import java.util.concurrent.ScheduledThreadPoolExecutor; +import java.util.concurrent.ThreadFactory; +import java.util.concurrent.ThreadLocalRandom; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.TimeoutException; + +/** + * ScheduledThreadPoolExecutor that will add some jitter to the RunnableScheduledFuture.getDelay. + * + * This will spread out things on a distributed cluster. + */ +public class JitterScheduledThreadPoolExecutorImpl extends ScheduledThreadPoolExecutor { + private final double spread; + + /** + * Main constructor. + * @param spread The percent up and down that RunnableScheduledFuture.getDelay should be jittered. + */ + public JitterScheduledThreadPoolExecutorImpl(int corePoolSize, + ThreadFactory threadFactory, + double spread) { + super(corePoolSize, threadFactory); + this.spread = spread; + } + + protected <V> java.util.concurrent.RunnableScheduledFuture<V> decorateTask( + Runnable runnable, java.util.concurrent.RunnableScheduledFuture<V> task) { + return new JitteredRunnableScheduledFuture<>(task); + } + + + protected <V> java.util.concurrent.RunnableScheduledFuture<V> decorateTask( + Callable<V> callable, java.util.concurrent.RunnableScheduledFuture<V> task) { + return new JitteredRunnableScheduledFuture<>(task); + } + + /** + * Class that basically just defers to the wrapped future. + * The only exception is getDelay + */ + protected class JitteredRunnableScheduledFuture<V> implements RunnableScheduledFuture<V> { + private final RunnableScheduledFuture<V> wrapped; + JitteredRunnableScheduledFuture(RunnableScheduledFuture<V> wrapped) { + this.wrapped = wrapped; + } + + @Override + public boolean isPeriodic() { + return wrapped.isPeriodic(); + } + + @Override + public long getDelay(TimeUnit unit) { + long baseDelay = wrapped.getDelay(unit); + long spreadTime = (long) (baseDelay * spread); + long delay = baseDelay + ThreadLocalRandom.current().nextLong(-spreadTime, spreadTime); + // Ensure that we don't roll over for nanoseconds. + return (delay < 0) ? baseDelay : delay; + } + + @Override + public int compareTo(Delayed o) { + return wrapped.compareTo(o); + } + + @Override + public void run() { + wrapped.run(); + } + + @Override + public boolean cancel(boolean mayInterruptIfRunning) { + return wrapped.cancel(mayInterruptIfRunning); + } + + @Override + public boolean isCancelled() { + return wrapped.isCancelled(); + } + + @Override + public boolean isDone() { + return wrapped.isDone(); + } + + @Override + public V get() throws InterruptedException, ExecutionException { + return wrapped.get(); + } + + @Override + public V get(long timeout, + TimeUnit unit) throws InterruptedException, ExecutionException, TimeoutException { + return wrapped.get(timeout, unit); + } + } + +} http://git-wip-us.apache.org/repos/asf/hbase/blob/cd5ddc5f/hbase-common/src/test/java/org/apache/hadoop/hbase/TestChoreService.java ---------------------------------------------------------------------- diff --git a/hbase-common/src/test/java/org/apache/hadoop/hbase/TestChoreService.java b/hbase-common/src/test/java/org/apache/hadoop/hbase/TestChoreService.java index b113174..d1a6c19 100644 --- a/hbase-common/src/test/java/org/apache/hadoop/hbase/TestChoreService.java +++ b/hbase-common/src/test/java/org/apache/hadoop/hbase/TestChoreService.java @@ -315,7 +315,7 @@ public class TestChoreService { final int corePoolSize = 10; final int defaultCorePoolSize = ChoreService.MIN_CORE_POOL_SIZE; - ChoreService customInit = new ChoreService("testChoreServiceConstruction_custom", corePoolSize); + ChoreService customInit = new ChoreService("testChoreServiceConstruction_custom", corePoolSize, false); try { assertEquals(corePoolSize, customInit.getCorePoolSize()); } finally { @@ -329,11 +329,11 @@ public class TestChoreService { shutdownService(defaultInit); } - ChoreService invalidInit = new ChoreService("testChoreServiceConstruction_invalid", -10); + ChoreService invalidInit = new ChoreService("testChoreServiceConstruction_invalid", -10, false); try { assertEquals(defaultCorePoolSize, invalidInit.getCorePoolSize()); } finally { - shutdownService(invalidInit); + shutdownService(invalidInit); } } @@ -403,7 +403,7 @@ public class TestChoreService { @Test (timeout=20000) public void testCorePoolIncrease() throws InterruptedException { final int initialCorePoolSize = 3; - ChoreService service = new ChoreService("testCorePoolIncrease", initialCorePoolSize); + ChoreService service = new ChoreService("testCorePoolIncrease", initialCorePoolSize, false); try { assertEquals("Should have a core pool of size: " + initialCorePoolSize, initialCorePoolSize, @@ -443,7 +443,7 @@ public class TestChoreService { @Test(timeout = 30000) public void testCorePoolDecrease() throws InterruptedException { final int initialCorePoolSize = 3; - ChoreService service = new ChoreService("testCorePoolDecrease", initialCorePoolSize); + ChoreService service = new ChoreService("testCorePoolDecrease", initialCorePoolSize, false); final int chorePeriod = 100; try { // Slow chores always miss their start time and thus the core pool size should be at least as http://git-wip-us.apache.org/repos/asf/hbase/blob/cd5ddc5f/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java ---------------------------------------------------------------------- diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java index 2ce2193..211fed5 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java @@ -605,7 +605,7 @@ public class HRegionServer extends HasThread implements rpcServices.start(); putUpWebUI(); this.walRoller = new LogRoller(this, this); - this.choreService = new ChoreService(getServerName().toString()); + this.choreService = new ChoreService(getServerName().toString(), true); if (!SystemUtils.IS_OS_WINDOWS) { Signal.handle(new Signal("HUP"), new SignalHandler() { @@ -1574,8 +1574,8 @@ public class HRegionServer extends HasThread implements static class PeriodicMemstoreFlusher extends ScheduledChore { final HRegionServer server; - final static int RANGE_OF_DELAY = 20000; //millisec - final static int MIN_DELAY_TIME = 3000; //millisec + final static int RANGE_OF_DELAY = 5 * 60 * 1000; // 5 min in milliseconds + final static int MIN_DELAY_TIME = 0; // millisec public PeriodicMemstoreFlusher(int cacheFlushInterval, final HRegionServer server) { super(server.getServerName() + "-MemstoreFlusherChore", server, cacheFlushInterval); this.server = server;