HBASE-18451 PeriodicMemstoreFlusher should inspect the queue before adding a 
delayed flush request

Signed-off-by: Andrew Purtell <apurt...@apache.org>

Conflicts:
        
hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java
        
hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestWALReplay.java


Project: http://git-wip-us.apache.org/repos/asf/hbase/repo
Commit: http://git-wip-us.apache.org/repos/asf/hbase/commit/785e21fe
Tree: http://git-wip-us.apache.org/repos/asf/hbase/tree/785e21fe
Diff: http://git-wip-us.apache.org/repos/asf/hbase/diff/785e21fe

Branch: refs/heads/branch-1.3
Commit: 785e21fe545da33811a50e0718d7cfeb7dc74df7
Parents: a4baeeb
Author: xcang <xc...@salesforce.com>
Authored: Sun Sep 23 23:42:57 2018 -0700
Committer: Andrew Purtell <apurt...@apache.org>
Committed: Wed Dec 12 19:25:50 2018 -0800

----------------------------------------------------------------------
 .../hadoop/hbase/regionserver/FlushRequester.java       |  6 ++++--
 .../apache/hadoop/hbase/regionserver/HRegionServer.java | 12 +++++++-----
 .../hadoop/hbase/regionserver/MemStoreFlusher.java      | 12 ++++++++----
 .../hbase/regionserver/TestHeapMemoryManager.java       |  7 ++++---
 .../hadoop/hbase/regionserver/wal/TestWALReplay.java    | 11 +++++------
 5 files changed, 28 insertions(+), 20 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hbase/blob/785e21fe/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/FlushRequester.java
----------------------------------------------------------------------
diff --git 
a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/FlushRequester.java
 
b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/FlushRequester.java
index c7e155a..243546c 100644
--- 
a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/FlushRequester.java
+++ 
b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/FlushRequester.java
@@ -32,8 +32,9 @@ public interface FlushRequester {
    * @param region the Region requesting the cache flush
    * @param forceFlushAllStores whether we want to flush all stores. e.g., 
when request from log
    *          rolling.
+   * @return true if our region is added into the queue, false otherwise
    */
-  void requestFlush(Region region, boolean forceFlushAllStores);
+  boolean requestFlush(Region region, boolean forceFlushAllStores);
 
   /**
    * Tell the listener the cache needs to be flushed after a delay
@@ -42,8 +43,9 @@ public interface FlushRequester {
    * @param delay after how much time should the flush happen
    * @param forceFlushAllStores whether we want to flush all stores. e.g., 
when request from log
    *          rolling.
+   * @return true if our region is added into the queue, false otherwise
    */
-  void requestDelayedFlush(Region region, long delay, boolean 
forceFlushAllStores);
+  boolean requestDelayedFlush(Region region, long delay, boolean 
forceFlushAllStores);
 
   /**
    * Register a FlushRequestListener

http://git-wip-us.apache.org/repos/asf/hbase/blob/785e21fe/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java
----------------------------------------------------------------------
diff --git 
a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java
 
b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java
index 21f269e..bbf488a 100644
--- 
a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java
+++ 
b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java
@@ -28,6 +28,7 @@ import java.net.BindException;
 import java.net.InetAddress;
 import java.net.InetSocketAddress;
 import java.security.PrivilegedExceptionAction;
+import java.text.MessageFormat;
 import java.util.ArrayList;
 import java.util.Collection;
 import java.util.Collections;
@@ -1635,17 +1636,18 @@ public class HRegionServer extends HasThread implements
       final StringBuffer whyFlush = new StringBuffer();
       for (Region r : this.server.onlineRegions.values()) {
         if (r == null) continue;
-        if (((HRegion)r).shouldFlush(whyFlush)) {
+        if (((HRegion) r).shouldFlush(whyFlush)) {
           FlushRequester requester = server.getFlushRequester();
           if (requester != null) {
             long randomDelay = (long) RandomUtils.nextInt(rangeOfDelay) + 
MIN_DELAY_TIME;
-            LOG.info(getName() + " requesting flush of " +
-              r.getRegionInfo().getRegionNameAsString() + " because " +
-              whyFlush.toString() + " after random delay " + randomDelay + 
"ms");
             //Throttle the flushes by putting a delay. If we don't throttle, 
and there
             //is a balanced write-load on the regions in a table, we might end 
up
             //overwhelming the filesystem with too many flushes at once.
-            requester.requestDelayedFlush(r, randomDelay, false);
+            if (requester.requestDelayedFlush(r, randomDelay, false)) {
+              LOG.info(getName() + " requesting flush of " +
+                  r.getRegionInfo().getRegionNameAsString() + " because " +
+                  whyFlush.toString() + " after random delay " + randomDelay + 
"ms");
+            }
           }
         }
       }

http://git-wip-us.apache.org/repos/asf/hbase/blob/785e21fe/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/MemStoreFlusher.java
----------------------------------------------------------------------
diff --git 
a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/MemStoreFlusher.java
 
b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/MemStoreFlusher.java
index b4adea6..80458bc 100644
--- 
a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/MemStoreFlusher.java
+++ 
b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/MemStoreFlusher.java
@@ -360,8 +360,7 @@ class MemStoreFlusher implements FlushRequester {
   }
 
   @Override
-  public void requestFlush(Region r, boolean forceFlushAllStores) {
-    ((HRegion)r).incrementFlushesQueuedCount();
+  public boolean requestFlush(Region r, boolean forceFlushAllStores) {
     synchronized (regionsInQueue) {
       if (!regionsInQueue.containsKey(r)) {
         // This entry has no delay so it will be added at the top of the flush
@@ -369,13 +368,15 @@ class MemStoreFlusher implements FlushRequester {
         FlushRegionEntry fqe = new FlushRegionEntry(r, forceFlushAllStores);
         this.regionsInQueue.put(r, fqe);
         this.flushQueue.add(fqe);
+        ((HRegion)r).incrementFlushesQueuedCount();
+        return true;
       }
+      return false;
     }
   }
 
   @Override
-  public void requestDelayedFlush(Region r, long delay, boolean 
forceFlushAllStores) {
-    ((HRegion)r).incrementFlushesQueuedCount();
+  public boolean requestDelayedFlush(Region r, long delay, boolean 
forceFlushAllStores) {
     synchronized (regionsInQueue) {
       if (!regionsInQueue.containsKey(r)) {
         // This entry has some delay
@@ -383,7 +384,10 @@ class MemStoreFlusher implements FlushRequester {
         fqe.requeue(delay);
         this.regionsInQueue.put(r, fqe);
         this.flushQueue.add(fqe);
+        ((HRegion)r).incrementFlushesQueuedCount();
+        return true;
       }
+      return false;
     }
   }
 

http://git-wip-us.apache.org/repos/asf/hbase/blob/785e21fe/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestHeapMemoryManager.java
----------------------------------------------------------------------
diff --git 
a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestHeapMemoryManager.java
 
b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestHeapMemoryManager.java
index 0e72d0d..ddc5558 100644
--- 
a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestHeapMemoryManager.java
+++ 
b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestHeapMemoryManager.java
@@ -633,13 +633,14 @@ public class TestHeapMemoryManager {
     }
 
     @Override
-    public void requestFlush(Region region, boolean forceFlushAllStores) {
+    public boolean requestFlush(Region region, boolean forceFlushAllStores) {
       this.listener.flushRequested(flushType, region);
+      return true;
     }
 
     @Override
-    public void requestDelayedFlush(Region region, long delay, boolean 
forceFlushAllStores) {
-
+    public boolean requestDelayedFlush(Region region, long delay, boolean 
forceFlushAllStores) {
+      return true;
     }
 
     @Override

http://git-wip-us.apache.org/repos/asf/hbase/blob/785e21fe/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestWALReplay.java
----------------------------------------------------------------------
diff --git 
a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestWALReplay.java
 
b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestWALReplay.java
index 5240f2a..8758c98 100644
--- 
a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestWALReplay.java
+++ 
b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestWALReplay.java
@@ -1136,21 +1136,20 @@ public class TestWALReplay {
   // Flusher used in this test.  Keep count of how often we are called and
   // actually run the flush inside here.
   class TestFlusher implements FlushRequester {
-    private HRegion r;
 
     @Override
-    public void requestFlush(Region region, boolean force) {
+    public boolean requestFlush(Region region, boolean force) {
       try {
-        r.flush(force);
+        region.flush(force);
+        return true;
       } catch (IOException e) {
         throw new RuntimeException("Exception flushing", e);
       }
     }
 
     @Override
-    public void requestDelayedFlush(Region region, long when, boolean 
forceFlushAllStores) {
-      // TODO Auto-generated method stub
-
+    public boolean requestDelayedFlush(Region region, long when, boolean 
forceFlushAllStores) {
+      return true;
     }
 
     @Override

Reply via email to