HBASE-18451 PeriodicMemstoreFlusher should inspect the queue before adding a delayed flush request
Signed-off-by: Andrew Purtell <apurt...@apache.org> Conflicts: hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestWALReplay.java Project: http://git-wip-us.apache.org/repos/asf/hbase/repo Commit: http://git-wip-us.apache.org/repos/asf/hbase/commit/785e21fe Tree: http://git-wip-us.apache.org/repos/asf/hbase/tree/785e21fe Diff: http://git-wip-us.apache.org/repos/asf/hbase/diff/785e21fe Branch: refs/heads/branch-1.3 Commit: 785e21fe545da33811a50e0718d7cfeb7dc74df7 Parents: a4baeeb Author: xcang <xc...@salesforce.com> Authored: Sun Sep 23 23:42:57 2018 -0700 Committer: Andrew Purtell <apurt...@apache.org> Committed: Wed Dec 12 19:25:50 2018 -0800 ---------------------------------------------------------------------- .../hadoop/hbase/regionserver/FlushRequester.java | 6 ++++-- .../apache/hadoop/hbase/regionserver/HRegionServer.java | 12 +++++++----- .../hadoop/hbase/regionserver/MemStoreFlusher.java | 12 ++++++++---- .../hbase/regionserver/TestHeapMemoryManager.java | 7 ++++--- .../hadoop/hbase/regionserver/wal/TestWALReplay.java | 11 +++++------ 5 files changed, 28 insertions(+), 20 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/hbase/blob/785e21fe/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/FlushRequester.java ---------------------------------------------------------------------- diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/FlushRequester.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/FlushRequester.java index c7e155a..243546c 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/FlushRequester.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/FlushRequester.java @@ -32,8 +32,9 @@ public interface FlushRequester { * @param region the Region requesting the cache flush * @param forceFlushAllStores whether we want to flush all stores. e.g., when request from log * rolling. + * @return true if our region is added into the queue, false otherwise */ - void requestFlush(Region region, boolean forceFlushAllStores); + boolean requestFlush(Region region, boolean forceFlushAllStores); /** * Tell the listener the cache needs to be flushed after a delay @@ -42,8 +43,9 @@ public interface FlushRequester { * @param delay after how much time should the flush happen * @param forceFlushAllStores whether we want to flush all stores. e.g., when request from log * rolling. + * @return true if our region is added into the queue, false otherwise */ - void requestDelayedFlush(Region region, long delay, boolean forceFlushAllStores); + boolean requestDelayedFlush(Region region, long delay, boolean forceFlushAllStores); /** * Register a FlushRequestListener http://git-wip-us.apache.org/repos/asf/hbase/blob/785e21fe/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java ---------------------------------------------------------------------- diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java index 21f269e..bbf488a 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java @@ -28,6 +28,7 @@ import java.net.BindException; import java.net.InetAddress; import java.net.InetSocketAddress; import java.security.PrivilegedExceptionAction; +import java.text.MessageFormat; import java.util.ArrayList; import java.util.Collection; import java.util.Collections; @@ -1635,17 +1636,18 @@ public class HRegionServer extends HasThread implements final StringBuffer whyFlush = new StringBuffer(); for (Region r : this.server.onlineRegions.values()) { if (r == null) continue; - if (((HRegion)r).shouldFlush(whyFlush)) { + if (((HRegion) r).shouldFlush(whyFlush)) { FlushRequester requester = server.getFlushRequester(); if (requester != null) { long randomDelay = (long) RandomUtils.nextInt(rangeOfDelay) + MIN_DELAY_TIME; - LOG.info(getName() + " requesting flush of " + - r.getRegionInfo().getRegionNameAsString() + " because " + - whyFlush.toString() + " after random delay " + randomDelay + "ms"); //Throttle the flushes by putting a delay. If we don't throttle, and there //is a balanced write-load on the regions in a table, we might end up //overwhelming the filesystem with too many flushes at once. - requester.requestDelayedFlush(r, randomDelay, false); + if (requester.requestDelayedFlush(r, randomDelay, false)) { + LOG.info(getName() + " requesting flush of " + + r.getRegionInfo().getRegionNameAsString() + " because " + + whyFlush.toString() + " after random delay " + randomDelay + "ms"); + } } } } http://git-wip-us.apache.org/repos/asf/hbase/blob/785e21fe/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/MemStoreFlusher.java ---------------------------------------------------------------------- diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/MemStoreFlusher.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/MemStoreFlusher.java index b4adea6..80458bc 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/MemStoreFlusher.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/MemStoreFlusher.java @@ -360,8 +360,7 @@ class MemStoreFlusher implements FlushRequester { } @Override - public void requestFlush(Region r, boolean forceFlushAllStores) { - ((HRegion)r).incrementFlushesQueuedCount(); + public boolean requestFlush(Region r, boolean forceFlushAllStores) { synchronized (regionsInQueue) { if (!regionsInQueue.containsKey(r)) { // This entry has no delay so it will be added at the top of the flush @@ -369,13 +368,15 @@ class MemStoreFlusher implements FlushRequester { FlushRegionEntry fqe = new FlushRegionEntry(r, forceFlushAllStores); this.regionsInQueue.put(r, fqe); this.flushQueue.add(fqe); + ((HRegion)r).incrementFlushesQueuedCount(); + return true; } + return false; } } @Override - public void requestDelayedFlush(Region r, long delay, boolean forceFlushAllStores) { - ((HRegion)r).incrementFlushesQueuedCount(); + public boolean requestDelayedFlush(Region r, long delay, boolean forceFlushAllStores) { synchronized (regionsInQueue) { if (!regionsInQueue.containsKey(r)) { // This entry has some delay @@ -383,7 +384,10 @@ class MemStoreFlusher implements FlushRequester { fqe.requeue(delay); this.regionsInQueue.put(r, fqe); this.flushQueue.add(fqe); + ((HRegion)r).incrementFlushesQueuedCount(); + return true; } + return false; } } http://git-wip-us.apache.org/repos/asf/hbase/blob/785e21fe/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestHeapMemoryManager.java ---------------------------------------------------------------------- diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestHeapMemoryManager.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestHeapMemoryManager.java index 0e72d0d..ddc5558 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestHeapMemoryManager.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestHeapMemoryManager.java @@ -633,13 +633,14 @@ public class TestHeapMemoryManager { } @Override - public void requestFlush(Region region, boolean forceFlushAllStores) { + public boolean requestFlush(Region region, boolean forceFlushAllStores) { this.listener.flushRequested(flushType, region); + return true; } @Override - public void requestDelayedFlush(Region region, long delay, boolean forceFlushAllStores) { - + public boolean requestDelayedFlush(Region region, long delay, boolean forceFlushAllStores) { + return true; } @Override http://git-wip-us.apache.org/repos/asf/hbase/blob/785e21fe/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestWALReplay.java ---------------------------------------------------------------------- diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestWALReplay.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestWALReplay.java index 5240f2a..8758c98 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestWALReplay.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestWALReplay.java @@ -1136,21 +1136,20 @@ public class TestWALReplay { // Flusher used in this test. Keep count of how often we are called and // actually run the flush inside here. class TestFlusher implements FlushRequester { - private HRegion r; @Override - public void requestFlush(Region region, boolean force) { + public boolean requestFlush(Region region, boolean force) { try { - r.flush(force); + region.flush(force); + return true; } catch (IOException e) { throw new RuntimeException("Exception flushing", e); } } @Override - public void requestDelayedFlush(Region region, long when, boolean forceFlushAllStores) { - // TODO Auto-generated method stub - + public boolean requestDelayedFlush(Region region, long when, boolean forceFlushAllStores) { + return true; } @Override