Repository: hbase Updated Branches: refs/heads/branch-1 0b6e2907b -> f1111e6f9 refs/heads/branch-1.4 8a7ff12c5 -> 709feeae9 refs/heads/branch-2 818b33756 -> c9c861dbb refs/heads/branch-2.1 8dea60079 -> e26a6e0e1 refs/heads/master 6bc7089f9 -> 704f8b81b
HBASE-18451 PeriodicMemstoreFlusher should inspect the queue before adding a delayed flush request, fix logging Signed-off-by: Andrew Purtell <apurt...@apache.org> Project: http://git-wip-us.apache.org/repos/asf/hbase/repo Commit: http://git-wip-us.apache.org/repos/asf/hbase/commit/704f8b81 Tree: http://git-wip-us.apache.org/repos/asf/hbase/tree/704f8b81 Diff: http://git-wip-us.apache.org/repos/asf/hbase/diff/704f8b81 Branch: refs/heads/master Commit: 704f8b81bbbad2615c281017f36a57599f34a44b Parents: 6bc7089 Author: xcang <xc...@salesforce.com> Authored: Tue Sep 25 08:50:05 2018 -0700 Committer: Andrew Purtell <apurt...@apache.org> Committed: Fri Sep 28 11:50:08 2018 -0700 ---------------------------------------------------------------------- .../hadoop/hbase/regionserver/FlushRequester.java | 6 ++++-- .../hadoop/hbase/regionserver/HRegionServer.java | 10 +++++----- .../hadoop/hbase/regionserver/MemStoreFlusher.java | 13 +++++++++---- .../hbase/regionserver/TestHeapMemoryManager.java | 7 ++++--- .../hbase/regionserver/wal/AbstractTestWALReplay.java | 6 ++++-- 5 files changed, 26 insertions(+), 16 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/hbase/blob/704f8b81/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/FlushRequester.java ---------------------------------------------------------------------- diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/FlushRequester.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/FlushRequester.java index c54f771..4191fbf 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/FlushRequester.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/FlushRequester.java @@ -32,8 +32,9 @@ public interface FlushRequester { * @param region the Region requesting the cache flush * @param forceFlushAllStores whether we want to flush all stores. e.g., when request from log * rolling. + * @return true if our region is added into the queue, false otherwise */ - void requestFlush(HRegion region, boolean forceFlushAllStores, FlushLifeCycleTracker tracker); + boolean requestFlush(HRegion region, boolean forceFlushAllStores, FlushLifeCycleTracker tracker); /** * Tell the listener the cache needs to be flushed after a delay @@ -42,8 +43,9 @@ public interface FlushRequester { * @param delay after how much time should the flush happen * @param forceFlushAllStores whether we want to flush all stores. e.g., when request from log * rolling. + * @return true if our region is added into the queue, false otherwise */ - void requestDelayedFlush(HRegion region, long delay, boolean forceFlushAllStores); + boolean requestDelayedFlush(HRegion region, long delay, boolean forceFlushAllStores); /** * Register a FlushRequestListener http://git-wip-us.apache.org/repos/asf/hbase/blob/704f8b81/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java ---------------------------------------------------------------------- diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java index 8066714..33abaae 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java @@ -1773,14 +1773,14 @@ public class HRegionServer extends HasThread implements FlushRequester requester = server.getFlushRequester(); if (requester != null) { long randomDelay = (long) RandomUtils.nextInt(0, RANGE_OF_DELAY) + MIN_DELAY_TIME; - LOG.info(getName() + " requesting flush of " + - r.getRegionInfo().getRegionNameAsString() + " because " + - whyFlush.toString() + - " after random delay " + randomDelay + "ms"); //Throttle the flushes by putting a delay. If we don't throttle, and there //is a balanced write-load on the regions in a table, we might end up //overwhelming the filesystem with too many flushes at once. - requester.requestDelayedFlush(r, randomDelay, false); + if (requester.requestDelayedFlush(r, randomDelay, false)) { + LOG.info("{} requesting flush of {} because {} after random delay {} ms", + getName(), r.getRegionInfo().getRegionNameAsString(), whyFlush.toString(), + randomDelay); + } } } } http://git-wip-us.apache.org/repos/asf/hbase/blob/704f8b81/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/MemStoreFlusher.java ---------------------------------------------------------------------- diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/MemStoreFlusher.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/MemStoreFlusher.java index 2a49797..699c9b6 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/MemStoreFlusher.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/MemStoreFlusher.java @@ -460,8 +460,8 @@ class MemStoreFlusher implements FlushRequester { } @Override - public void requestFlush(HRegion r, boolean forceFlushAllStores, FlushLifeCycleTracker tracker) { - r.incrementFlushesQueuedCount(); + public boolean requestFlush(HRegion r, boolean forceFlushAllStores, + FlushLifeCycleTracker tracker) { synchronized (regionsInQueue) { if (!regionsInQueue.containsKey(r)) { // This entry has no delay so it will be added at the top of the flush @@ -469,15 +469,17 @@ class MemStoreFlusher implements FlushRequester { FlushRegionEntry fqe = new FlushRegionEntry(r, forceFlushAllStores, tracker); this.regionsInQueue.put(r, fqe); this.flushQueue.add(fqe); + r.incrementFlushesQueuedCount(); + return true; } else { tracker.notExecuted("Flush already requested on " + r); + return false; } } } @Override - public void requestDelayedFlush(HRegion r, long delay, boolean forceFlushAllStores) { - r.incrementFlushesQueuedCount(); + public boolean requestDelayedFlush(HRegion r, long delay, boolean forceFlushAllStores) { synchronized (regionsInQueue) { if (!regionsInQueue.containsKey(r)) { // This entry has some delay @@ -486,7 +488,10 @@ class MemStoreFlusher implements FlushRequester { fqe.requeue(delay); this.regionsInQueue.put(r, fqe); this.flushQueue.add(fqe); + r.incrementFlushesQueuedCount(); + return true; } + return false; } } http://git-wip-us.apache.org/repos/asf/hbase/blob/704f8b81/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestHeapMemoryManager.java ---------------------------------------------------------------------- diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestHeapMemoryManager.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestHeapMemoryManager.java index 41bfd48..9f05a73 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestHeapMemoryManager.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestHeapMemoryManager.java @@ -758,14 +758,15 @@ public class TestHeapMemoryManager { } @Override - public void requestFlush(HRegion region, boolean forceFlushAllStores, + public boolean requestFlush(HRegion region, boolean forceFlushAllStores, FlushLifeCycleTracker tracker) { this.listener.flushRequested(flushType, region); + return true; } @Override - public void requestDelayedFlush(HRegion region, long delay, boolean forceFlushAllStores) { - + public boolean requestDelayedFlush(HRegion region, long delay, boolean forceFlushAllStores) { + return true; } @Override http://git-wip-us.apache.org/repos/asf/hbase/blob/704f8b81/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/AbstractTestWALReplay.java ---------------------------------------------------------------------- diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/AbstractTestWALReplay.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/AbstractTestWALReplay.java index 225fbb3..3f9040b 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/AbstractTestWALReplay.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/AbstractTestWALReplay.java @@ -1110,16 +1110,18 @@ public abstract class AbstractTestWALReplay { private HRegion r; @Override - public void requestFlush(HRegion region, boolean force, FlushLifeCycleTracker tracker) { + public boolean requestFlush(HRegion region, boolean force, FlushLifeCycleTracker tracker) { try { r.flush(force); + return true; } catch (IOException e) { throw new RuntimeException("Exception flushing", e); } } @Override - public void requestDelayedFlush(HRegion region, long when, boolean forceFlushAllStores) { + public boolean requestDelayedFlush(HRegion region, long when, boolean forceFlushAllStores) { + return true; } @Override