HBASE-18905 Allow CPs to request flush on Region and know the completion of the 
requested flush


Project: http://git-wip-us.apache.org/repos/asf/hbase/repo
Commit: http://git-wip-us.apache.org/repos/asf/hbase/commit/ca79a915
Tree: http://git-wip-us.apache.org/repos/asf/hbase/tree/ca79a915
Diff: http://git-wip-us.apache.org/repos/asf/hbase/diff/ca79a915

Branch: refs/heads/branch-2
Commit: ca79a9156669f4f635b5ffd5dae115f870eef3ca
Parents: c2dbef1
Author: zhangduo <zhang...@apache.org>
Authored: Wed Oct 25 11:00:44 2017 +0800
Committer: zhangduo <zhang...@apache.org>
Committed: Wed Oct 25 20:45:53 2017 +0800

----------------------------------------------------------------------
 .../example/ZooKeeperScanPolicyObserver.java    |   3 +-
 .../hbase/coprocessor/RegionObserver.java       |  15 +-
 .../hbase/mob/DefaultMobStoreFlusher.java       |   6 +-
 .../hbase/regionserver/DefaultStoreFlusher.java |   5 +-
 .../regionserver/FlushLifeCycleTracker.java     |  51 ++++
 .../hbase/regionserver/FlushRequester.java      |   2 +-
 .../hadoop/hbase/regionserver/HRegion.java      | 118 +++++----
 .../hadoop/hbase/regionserver/HStore.java       |  20 +-
 .../hadoop/hbase/regionserver/LogRoller.java    |   2 +-
 .../hbase/regionserver/MemStoreFlusher.java     |  67 +++---
 .../hbase/regionserver/RSRpcServices.java       |   3 +-
 .../hadoop/hbase/regionserver/Region.java       |   5 +
 .../regionserver/RegionCoprocessorHost.java     |  23 +-
 .../hadoop/hbase/regionserver/StoreFlusher.java |   9 +-
 .../hbase/regionserver/StripeStoreFlusher.java  |   5 +-
 .../hbase/security/access/AccessController.java |   6 +-
 .../client/TestMobCloneSnapshotFromClient.java  |   4 +-
 .../hbase/coprocessor/SimpleRegionObserver.java |   5 +-
 .../coprocessor/TestCoprocessorInterface.java   |   9 +-
 .../TestRegionObserverInterface.java            |   4 +-
 .../TestRegionObserverScannerOpenHook.java      |   3 +-
 .../regionserver/NoOpScanPolicyObserver.java    |   2 +-
 .../regionserver/TestFlushLifeCycleTracker.java | 240 +++++++++++++++++++
 .../regionserver/TestFlushRegionEntry.java      |  43 ++--
 .../hbase/regionserver/TestHMobStore.java       |   2 +-
 .../hadoop/hbase/regionserver/TestHRegion.java  |  17 +-
 .../regionserver/TestHRegionReplayEvents.java   |   8 +-
 .../hadoop/hbase/regionserver/TestHStore.java   |  10 +-
 .../regionserver/TestHeapMemoryManager.java     |  75 +++---
 .../regionserver/TestSplitWalDataLoss.java      |  11 +-
 .../regionserver/wal/AbstractTestWALReplay.java |  20 +-
 .../security/access/TestAccessController.java   |   4 +-
 .../access/TestWithDisabledAuthorization.java   |   4 +-
 .../hbase/util/TestCoprocessorScanPolicy.java   |   3 +-
 34 files changed, 589 insertions(+), 215 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hbase/blob/ca79a915/hbase-examples/src/main/java/org/apache/hadoop/hbase/coprocessor/example/ZooKeeperScanPolicyObserver.java
----------------------------------------------------------------------
diff --git 
a/hbase-examples/src/main/java/org/apache/hadoop/hbase/coprocessor/example/ZooKeeperScanPolicyObserver.java
 
b/hbase-examples/src/main/java/org/apache/hadoop/hbase/coprocessor/example/ZooKeeperScanPolicyObserver.java
index f849c86..d6d66bb 100644
--- 
a/hbase-examples/src/main/java/org/apache/hadoop/hbase/coprocessor/example/ZooKeeperScanPolicyObserver.java
+++ 
b/hbase-examples/src/main/java/org/apache/hadoop/hbase/coprocessor/example/ZooKeeperScanPolicyObserver.java
@@ -34,6 +34,7 @@ import org.apache.hadoop.hbase.coprocessor.ObserverContext;
 import org.apache.hadoop.hbase.coprocessor.RegionCoprocessor;
 import org.apache.hadoop.hbase.coprocessor.RegionCoprocessorEnvironment;
 import org.apache.hadoop.hbase.coprocessor.RegionObserver;
+import org.apache.hadoop.hbase.regionserver.FlushLifeCycleTracker;
 import org.apache.hadoop.hbase.regionserver.InternalScanner;
 import org.apache.hadoop.hbase.regionserver.ScanType;
 import org.apache.hadoop.hbase.regionserver.ScannerContext;
@@ -188,7 +189,7 @@ public class ZooKeeperScanPolicyObserver implements 
RegionCoprocessor, RegionObs
 
   @Override
   public InternalScanner 
preFlush(ObserverContext<RegionCoprocessorEnvironment> c, Store store,
-      InternalScanner scanner) throws IOException {
+      InternalScanner scanner, FlushLifeCycleTracker tracker) throws 
IOException {
     return wrap(scanner);
   }
 

http://git-wip-us.apache.org/repos/asf/hbase/blob/ca79a915/hbase-server/src/main/java/org/apache/hadoop/hbase/coprocessor/RegionObserver.java
----------------------------------------------------------------------
diff --git 
a/hbase-server/src/main/java/org/apache/hadoop/hbase/coprocessor/RegionObserver.java
 
b/hbase-server/src/main/java/org/apache/hadoop/hbase/coprocessor/RegionObserver.java
index 5c89149..2ca1495 100644
--- 
a/hbase-server/src/main/java/org/apache/hadoop/hbase/coprocessor/RegionObserver.java
+++ 
b/hbase-server/src/main/java/org/apache/hadoop/hbase/coprocessor/RegionObserver.java
@@ -42,6 +42,7 @@ import org.apache.hadoop.hbase.filter.ByteArrayComparable;
 import org.apache.hadoop.hbase.io.FSDataInputStreamWrapper;
 import org.apache.hadoop.hbase.io.Reference;
 import org.apache.hadoop.hbase.io.hfile.CacheConfig;
+import org.apache.hadoop.hbase.regionserver.FlushLifeCycleTracker;
 import org.apache.hadoop.hbase.regionserver.InternalScanner;
 import org.apache.hadoop.hbase.regionserver.MiniBatchOperationInProgress;
 import org.apache.hadoop.hbase.regionserver.OperationStatus;
@@ -123,37 +124,43 @@ public interface RegionObserver {
   /**
    * Called before the memstore is flushed to disk.
    * @param c the environment provided by the region server
+   * @param tracker tracker used to track the life cycle of a flush
    */
-  default void preFlush(final ObserverContext<RegionCoprocessorEnvironment> c) 
throws IOException {}
+  default void preFlush(final ObserverContext<RegionCoprocessorEnvironment> c,
+      FlushLifeCycleTracker tracker) throws IOException {}
 
   /**
    * Called before a Store's memstore is flushed to disk.
    * @param c the environment provided by the region server
    * @param store the store where compaction is being requested
    * @param scanner the scanner over existing data used in the store file
+   * @param tracker tracker used to track the life cycle of a flush
    * @return the scanner to use during compaction.  Should not be {@code null}
    * unless the implementation is writing new store files on its own.
    */
   default InternalScanner 
preFlush(ObserverContext<RegionCoprocessorEnvironment> c, Store store,
-      InternalScanner scanner) throws IOException {
+      InternalScanner scanner, FlushLifeCycleTracker tracker) throws 
IOException {
     return scanner;
   }
 
   /**
    * Called after the memstore is flushed to disk.
    * @param c the environment provided by the region server
+   * @param tracker tracker used to track the life cycle of a flush
    * @throws IOException if an error occurred on the coprocessor
    */
-  default void postFlush(ObserverContext<RegionCoprocessorEnvironment> c) 
throws IOException {}
+  default void postFlush(ObserverContext<RegionCoprocessorEnvironment> c,
+      FlushLifeCycleTracker tracker) throws IOException {}
 
   /**
    * Called after a Store's memstore is flushed to disk.
    * @param c the environment provided by the region server
    * @param store the store being flushed
    * @param resultFile the new store file written out during compaction
+   * @param tracker tracker used to track the life cycle of a flush
    */
   default void postFlush(ObserverContext<RegionCoprocessorEnvironment> c, 
Store store,
-      StoreFile resultFile) throws IOException {}
+      StoreFile resultFile, FlushLifeCycleTracker tracker) throws IOException 
{}
 
   /**
    * Called prior to selecting the {@link StoreFile StoreFiles} to compact 
from the list of

http://git-wip-us.apache.org/repos/asf/hbase/blob/ca79a915/hbase-server/src/main/java/org/apache/hadoop/hbase/mob/DefaultMobStoreFlusher.java
----------------------------------------------------------------------
diff --git 
a/hbase-server/src/main/java/org/apache/hadoop/hbase/mob/DefaultMobStoreFlusher.java
 
b/hbase-server/src/main/java/org/apache/hadoop/hbase/mob/DefaultMobStoreFlusher.java
index 738f3bc..1bc8068 100644
--- 
a/hbase-server/src/main/java/org/apache/hadoop/hbase/mob/DefaultMobStoreFlusher.java
+++ 
b/hbase-server/src/main/java/org/apache/hadoop/hbase/mob/DefaultMobStoreFlusher.java
@@ -34,6 +34,7 @@ import org.apache.hadoop.hbase.KeyValue;
 import org.apache.hadoop.hbase.KeyValueUtil;
 import org.apache.hadoop.hbase.monitoring.MonitoredTask;
 import org.apache.hadoop.hbase.regionserver.DefaultStoreFlusher;
+import org.apache.hadoop.hbase.regionserver.FlushLifeCycleTracker;
 import org.apache.hadoop.hbase.regionserver.HMobStore;
 import org.apache.hadoop.hbase.regionserver.HStore;
 import org.apache.hadoop.hbase.regionserver.InternalScanner;
@@ -100,14 +101,15 @@ public class DefaultMobStoreFlusher extends 
DefaultStoreFlusher {
    */
   @Override
   public List<Path> flushSnapshot(MemStoreSnapshot snapshot, long cacheFlushId,
-      MonitoredTask status, ThroughputController throughputController) throws 
IOException {
+      MonitoredTask status, ThroughputController throughputController,
+      FlushLifeCycleTracker tracker) throws IOException {
     ArrayList<Path> result = new ArrayList<>();
     long cellsCount = snapshot.getCellsCount();
     if (cellsCount == 0) return result; // don't flush if there are no entries
 
     // Use a store scanner to find which rows to flush.
     long smallestReadPoint = store.getSmallestReadPoint();
-    InternalScanner scanner = createScanner(snapshot.getScanners(), 
smallestReadPoint);
+    InternalScanner scanner = createScanner(snapshot.getScanners(), 
smallestReadPoint, tracker);
     if (scanner == null) {
       return result; // NULL scanner returned from coprocessor hooks means 
skip normal processing
     }

http://git-wip-us.apache.org/repos/asf/hbase/blob/ca79a915/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/DefaultStoreFlusher.java
----------------------------------------------------------------------
diff --git 
a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/DefaultStoreFlusher.java
 
b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/DefaultStoreFlusher.java
index 2e907e8..06d4752 100644
--- 
a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/DefaultStoreFlusher.java
+++ 
b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/DefaultStoreFlusher.java
@@ -45,14 +45,15 @@ public class DefaultStoreFlusher extends StoreFlusher {
 
   @Override
   public List<Path> flushSnapshot(MemStoreSnapshot snapshot, long cacheFlushId,
-      MonitoredTask status, ThroughputController throughputController) throws 
IOException {
+      MonitoredTask status, ThroughputController throughputController,
+      FlushLifeCycleTracker tracker) throws IOException {
     ArrayList<Path> result = new ArrayList<>();
     int cellsCount = snapshot.getCellsCount();
     if (cellsCount == 0) return result; // don't flush if there are no entries
 
     // Use a store scanner to find which rows to flush.
     long smallestReadPoint = store.getSmallestReadPoint();
-    InternalScanner scanner = createScanner(snapshot.getScanners(), 
smallestReadPoint);
+    InternalScanner scanner = createScanner(snapshot.getScanners(), 
smallestReadPoint, tracker);
     if (scanner == null) {
       return result; // NULL scanner returned from coprocessor hooks means 
skip normal processing
     }

http://git-wip-us.apache.org/repos/asf/hbase/blob/ca79a915/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/FlushLifeCycleTracker.java
----------------------------------------------------------------------
diff --git 
a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/FlushLifeCycleTracker.java
 
b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/FlushLifeCycleTracker.java
new file mode 100644
index 0000000..f806023
--- /dev/null
+++ 
b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/FlushLifeCycleTracker.java
@@ -0,0 +1,51 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.regionserver;
+
+import org.apache.hadoop.hbase.HBaseInterfaceAudience;
+import org.apache.yetus.audience.InterfaceAudience;
+import org.apache.yetus.audience.InterfaceStability;
+
+/**
+ * Used to track flush execution.
+ */
+@InterfaceAudience.LimitedPrivate(HBaseInterfaceAudience.COPROC)
+@InterfaceStability.Evolving
+public interface FlushLifeCycleTracker {
+
+  static FlushLifeCycleTracker DUMMY = new FlushLifeCycleTracker() {
+  };
+
+  /**
+   * Called if the flush request fails for some reason.
+   */
+  default void notExecuted(String reason) {
+  }
+
+  /**
+   * Called before flush is executed.
+   */
+  default void beforeExecution() {
+  }
+
+  /**
+   * Called after flush is executed.
+   */
+  default void afterExecution() {
+  }
+}

http://git-wip-us.apache.org/repos/asf/hbase/blob/ca79a915/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/FlushRequester.java
----------------------------------------------------------------------
diff --git 
a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/FlushRequester.java
 
b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/FlushRequester.java
index 931a737..c54f771 100644
--- 
a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/FlushRequester.java
+++ 
b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/FlushRequester.java
@@ -33,7 +33,7 @@ public interface FlushRequester {
    * @param forceFlushAllStores whether we want to flush all stores. e.g., 
when request from log
    *          rolling.
    */
-  void requestFlush(HRegion region, boolean forceFlushAllStores);
+  void requestFlush(HRegion region, boolean forceFlushAllStores, 
FlushLifeCycleTracker tracker);
 
   /**
    * Tell the listener the cache needs to be flushed after a delay

http://git-wip-us.apache.org/repos/asf/hbase/blob/ca79a915/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java
----------------------------------------------------------------------
diff --git 
a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java 
b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java
index 5cbf889..f0c9ec2 100644
--- 
a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java
+++ 
b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java
@@ -2203,7 +2203,7 @@ public class HRegion implements HeapSize, 
PropagatingConfigurationObserver, Regi
    */
   // TODO HBASE-18905. We might have to expose a requestFlush API for CPs
   public FlushResult flush(boolean force) throws IOException {
-    return flushcache(force, false);
+    return flushcache(force, false, FlushLifeCycleTracker.DUMMY);
   }
 
   public static interface FlushResult {
@@ -2241,6 +2241,7 @@ public class HRegion implements HeapSize, 
PropagatingConfigurationObserver, Regi
    * time-sensitive thread.
    * @param forceFlushAllStores whether we want to flush all stores
    * @param writeFlushRequestWalMarker whether to write the flush request 
marker to WAL
+   * @param tracker used to track the life cycle of this flush
    * @return whether the flush is success and whether the region needs 
compacting
    *
    * @throws IOException general io exceptions
@@ -2248,8 +2249,8 @@ public class HRegion implements HeapSize, 
PropagatingConfigurationObserver, Regi
    * because a Snapshot was not properly persisted. The region is put in 
closing mode, and the
    * caller MUST abort after this.
    */
-  public FlushResultImpl flushcache(boolean forceFlushAllStores, boolean 
writeFlushRequestWalMarker)
-      throws IOException {
+  public FlushResultImpl flushcache(boolean forceFlushAllStores, boolean 
writeFlushRequestWalMarker,
+      FlushLifeCycleTracker tracker) throws IOException {
     // fail-fast instead of waiting on the lock
     if (this.closing.get()) {
       String msg = "Skipping flush on " + this + " because closing";
@@ -2269,7 +2270,7 @@ public class HRegion implements HeapSize, 
PropagatingConfigurationObserver, Regi
       }
       if (coprocessorHost != null) {
         status.setStatus("Running coprocessor pre-flush hooks");
-        coprocessorHost.preFlush();
+        coprocessorHost.preFlush(tracker);
       }
       // TODO: this should be managed within memstore with the snapshot, 
updated only after flush
       // successful
@@ -2298,11 +2299,11 @@ public class HRegion implements HeapSize, 
PropagatingConfigurationObserver, Regi
         Collection<HStore> specificStoresToFlush =
             forceFlushAllStores ? stores.values() : 
flushPolicy.selectStoresToFlush();
         FlushResultImpl fs =
-            internalFlushcache(specificStoresToFlush, status, 
writeFlushRequestWalMarker);
+            internalFlushcache(specificStoresToFlush, status, 
writeFlushRequestWalMarker, tracker);
 
         if (coprocessorHost != null) {
           status.setStatus("Running post-flush coprocessor hooks");
-          coprocessorHost.postFlush();
+          coprocessorHost.postFlush(tracker);
         }
 
         if(fs.isFlushSucceeded()) {
@@ -2398,7 +2399,7 @@ public class HRegion implements HeapSize, 
PropagatingConfigurationObserver, Regi
    * @see #internalFlushcache(Collection, MonitoredTask, boolean)
    */
   private FlushResult internalFlushcache(MonitoredTask status) throws 
IOException {
-    return internalFlushcache(stores.values(), status, false);
+    return internalFlushcache(stores.values(), status, false, 
FlushLifeCycleTracker.DUMMY);
   }
 
   /**
@@ -2406,9 +2407,9 @@ public class HRegion implements HeapSize, 
PropagatingConfigurationObserver, Regi
    * @see #internalFlushcache(WAL, long, Collection, MonitoredTask, boolean)
    */
   private FlushResultImpl internalFlushcache(Collection<HStore> storesToFlush, 
MonitoredTask status,
-      boolean writeFlushWalMarker) throws IOException {
+      boolean writeFlushWalMarker, FlushLifeCycleTracker tracker) throws 
IOException {
     return internalFlushcache(this.wal, HConstants.NO_SEQNUM, storesToFlush, 
status,
-      writeFlushWalMarker);
+      writeFlushWalMarker, tracker);
   }
 
   /**
@@ -2429,10 +2430,11 @@ public class HRegion implements HeapSize, 
PropagatingConfigurationObserver, Regi
    * @throws IOException general io exceptions
    * @throws DroppedSnapshotException Thrown when replay of WAL is required.
    */
-  protected FlushResultImpl internalFlushcache(WAL wal, long myseqid, 
Collection<HStore> storesToFlush,
-      MonitoredTask status, boolean writeFlushWalMarker) throws IOException {
-    PrepareFlushResult result
-      = internalPrepareFlushCache(wal, myseqid, storesToFlush, status, 
writeFlushWalMarker);
+  protected FlushResultImpl internalFlushcache(WAL wal, long myseqid,
+      Collection<HStore> storesToFlush, MonitoredTask status, boolean 
writeFlushWalMarker,
+      FlushLifeCycleTracker tracker) throws IOException {
+    PrepareFlushResult result =
+        internalPrepareFlushCache(wal, myseqid, storesToFlush, status, 
writeFlushWalMarker, tracker);
     if (result.result == null) {
       return internalFlushCacheAndCommit(wal, status, result, storesToFlush);
     } else {
@@ -2443,8 +2445,8 @@ public class HRegion implements HeapSize, 
PropagatingConfigurationObserver, Regi
   
@edu.umd.cs.findbugs.annotations.SuppressWarnings(value="DLS_DEAD_LOCAL_STORE",
       justification="FindBugs seems confused about trxId")
   protected PrepareFlushResult internalPrepareFlushCache(WAL wal, long myseqid,
-      Collection<HStore> storesToFlush, MonitoredTask status, boolean 
writeFlushWalMarker)
-      throws IOException {
+      Collection<HStore> storesToFlush, MonitoredTask status, boolean 
writeFlushWalMarker,
+      FlushLifeCycleTracker tracker) throws IOException {
     if (this.rsServices != null && this.rsServices.isAborted()) {
       // Don't flush when server aborting, it's unsafe
       throw new IOException("Aborting flush because server is aborted...");
@@ -2469,9 +2471,9 @@ public class HRegion implements HeapSize, 
PropagatingConfigurationObserver, Regi
           if (wal != null) {
             writeEntry = mvcc.begin();
             long flushOpSeqId = writeEntry.getWriteNumber();
-            FlushResultImpl flushResult = new 
FlushResultImpl(FlushResult.Result.CANNOT_FLUSH_MEMSTORE_EMPTY,
-              flushOpSeqId, "Nothing to flush",
-            writeFlushRequestMarkerToWAL(wal, writeFlushWalMarker));
+            FlushResultImpl flushResult =
+                new 
FlushResultImpl(FlushResult.Result.CANNOT_FLUSH_MEMSTORE_EMPTY, flushOpSeqId,
+                    "Nothing to flush", writeFlushRequestMarkerToWAL(wal, 
writeFlushWalMarker));
             mvcc.completeAndWait(writeEntry);
             // Set to null so we don't complete it again down in finally block.
             writeEntry = null;
@@ -2547,8 +2549,9 @@ public class HRegion implements HeapSize, 
PropagatingConfigurationObserver, Regi
         MemStoreSize flushableSize = s.getFlushableSize();
         totalSizeOfFlushableStores.incMemStoreSize(flushableSize);
         storeFlushCtxs.put(s.getColumnFamilyDescriptor().getName(),
-            s.createFlushContext(flushOpSeqId));
-        committedFiles.put(s.getColumnFamilyDescriptor().getName(), null); // 
for writing stores to WAL
+          s.createFlushContext(flushOpSeqId, tracker));
+        // for writing stores to WAL
+        committedFiles.put(s.getColumnFamilyDescriptor().getName(), null);
         storeFlushableSize.put(s.getColumnFamilyDescriptor().getName(), 
flushableSize);
       }
 
@@ -4079,29 +4082,6 @@ public class HRegion implements HeapSize, 
PropagatingConfigurationObserver, Regi
     }
   }
 
-  private void requestFlushIfNeeded(long memstoreTotalSize) throws 
RegionTooBusyException {
-    if(memstoreTotalSize > this.getMemStoreFlushSize()) {
-      requestFlush();
-    }
-  }
-
-  private void requestFlush() {
-    if (this.rsServices == null) {
-      return;
-    }
-    synchronized (writestate) {
-      if (this.writestate.isFlushRequested()) {
-        return;
-      }
-      writestate.flushRequested = true;
-    }
-    // Make request outside of synchronize block; HBASE-818.
-    this.rsServices.getFlushRequester().requestFlush(this, false);
-    if (LOG.isDebugEnabled()) {
-      LOG.debug("Flush requested on " + this.getRegionInfo().getEncodedName());
-    }
-  }
-
   /*
    * @param size
    * @return True if size is over the flush threshold
@@ -4216,7 +4196,7 @@ public class HRegion implements HeapSize, 
PropagatingConfigurationObserver, Regi
     }
     if (seqid > minSeqIdForTheRegion) {
       // Then we added some edits to memory. Flush and cleanup split edit 
files.
-      internalFlushcache(null, seqid, stores.values(), status, false);
+      internalFlushcache(null, seqid, stores.values(), status, false, 
FlushLifeCycleTracker.DUMMY);
     }
     // Now delete the content of recovered edits.  We're done w/ them.
     if (files.size() > 0 && 
this.conf.getBoolean("hbase.region.archive.recovered.edits", false)) {
@@ -4400,7 +4380,8 @@ public class HRegion implements HeapSize, 
PropagatingConfigurationObserver, Regi
           }
           flush = isFlushSize(this.addAndGetMemStoreSize(memstoreSize));
           if (flush) {
-            internalFlushcache(null, currentEditSeqId, stores.values(), 
status, false);
+            internalFlushcache(null, currentEditSeqId, stores.values(), 
status, false,
+              FlushLifeCycleTracker.DUMMY);
           }
 
           if (coprocessorHost != null) {
@@ -4603,8 +4584,8 @@ public class HRegion implements HeapSize, 
PropagatingConfigurationObserver, Regi
           // we can just snapshot our memstores and continue as normal.
 
           // invoke prepareFlushCache. Send null as wal since we do not want 
the flush events in wal
-          PrepareFlushResult prepareResult = internalPrepareFlushCache(null,
-            flushSeqId, storesToFlush, status, false);
+          PrepareFlushResult prepareResult = internalPrepareFlushCache(null, 
flushSeqId,
+            storesToFlush, status, false, FlushLifeCycleTracker.DUMMY);
           if (prepareResult.result == null) {
             // save the PrepareFlushResult so that we can use it later from 
commit flush
             this.writestate.flushing = true;
@@ -4818,7 +4799,7 @@ public class HRegion implements HeapSize, 
PropagatingConfigurationObserver, Regi
       StoreFlushContext ctx = null;
       long startTime = EnvironmentEdgeManager.currentTime();
       if (prepareFlushResult == null || prepareFlushResult.storeFlushCtxs == 
null) {
-        ctx = store.createFlushContext(flush.getFlushSequenceNumber());
+        ctx = store.createFlushContext(flush.getFlushSequenceNumber(), 
FlushLifeCycleTracker.DUMMY);
       } else {
         ctx = prepareFlushResult.storeFlushCtxs.get(family);
         startTime = prepareFlushResult.startTime;
@@ -4878,7 +4859,7 @@ public class HRegion implements HeapSize, 
PropagatingConfigurationObserver, Regi
       throws IOException {
     MemStoreSize flushableSize = s.getFlushableSize();
     this.decrMemStoreSize(flushableSize);
-    StoreFlushContext ctx = s.createFlushContext(currentSeqId);
+    StoreFlushContext ctx = s.createFlushContext(currentSeqId, 
FlushLifeCycleTracker.DUMMY);
     ctx.prepare();
     ctx.abort();
     return flushableSize;
@@ -5724,7 +5705,7 @@ public class HRegion implements HeapSize, 
PropagatingConfigurationObserver, Regi
       // guaranteed to be one beyond the file made when we flushed (or if 
nothing to flush, it is
       // a sequence id that we can be sure is beyond the last hfile written).
       if (assignSeqId) {
-        FlushResult fs = flushcache(true, false);
+        FlushResult fs = flushcache(true, false, FlushLifeCycleTracker.DUMMY);
         if (fs.isFlushSucceeded()) {
           seqId = ((FlushResultImpl)fs).flushSequenceId;
         } else if (fs.getResult() == 
FlushResult.Result.CANNOT_FLUSH_MEMSTORE_EMPTY) {
@@ -8234,4 +8215,41 @@ public class HRegion implements HeapSize, 
PropagatingConfigurationObserver, Regi
     rsServices.getCompactionRequestor().requestCompaction(this, store, why, 
priority, tracker,
       RpcServer.getRequestUser().orElse(null));
   }
+
+  private void requestFlushIfNeeded(long memstoreTotalSize) throws 
RegionTooBusyException {
+    if (memstoreTotalSize > this.getMemStoreFlushSize()) {
+      requestFlush();
+    }
+  }
+
+  private void requestFlush() {
+    if (this.rsServices == null) {
+      return;
+    }
+    requestFlush0(FlushLifeCycleTracker.DUMMY);
+  }
+
+  private void requestFlush0(FlushLifeCycleTracker tracker) {
+    boolean shouldFlush = false;
+    synchronized (writestate) {
+      if (!this.writestate.isFlushRequested()) {
+        shouldFlush = true;
+        writestate.flushRequested = true;
+      }
+    }
+    if (shouldFlush) {
+      // Make request outside of synchronize block; HBASE-818.
+      this.rsServices.getFlushRequester().requestFlush(this, false, tracker);
+      if (LOG.isDebugEnabled()) {
+        LOG.debug("Flush requested on " + 
this.getRegionInfo().getEncodedName());
+      }
+    } else {
+      tracker.notExecuted("Flush already requested on " + this);
+    }
+  }
+
+  @Override
+  public void requestFlush(FlushLifeCycleTracker tracker) throws IOException {
+    requestFlush0(tracker);
+  }
 }

http://git-wip-us.apache.org/repos/asf/hbase/blob/ca79a915/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HStore.java
----------------------------------------------------------------------
diff --git 
a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HStore.java 
b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HStore.java
index e3d6724..52db699 100644
--- 
a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HStore.java
+++ 
b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HStore.java
@@ -952,7 +952,8 @@ public class HStore implements Store, HeapSize, 
StoreConfigInformation, Propagat
    * @throws IOException if exception occurs during process
    */
   protected List<Path> flushCache(final long logCacheFlushId, MemStoreSnapshot 
snapshot,
-      MonitoredTask status, ThroughputController throughputController) throws 
IOException {
+      MonitoredTask status, ThroughputController throughputController,
+      FlushLifeCycleTracker tracker) throws IOException {
     // If an exception happens flushing, we let it out without clearing
     // the memstore snapshot.  The old snapshot will be returned when we say
     // 'snapshot', the next time flush comes around.
@@ -963,7 +964,7 @@ public class HStore implements Store, HeapSize, 
StoreConfigInformation, Propagat
     for (int i = 0; i < flushRetriesNumber; i++) {
       try {
         List<Path> pathNames =
-            flusher.flushSnapshot(snapshot, logCacheFlushId, status, 
throughputController);
+            flusher.flushSnapshot(snapshot, logCacheFlushId, status, 
throughputController, tracker);
         Path lastPathName = null;
         try {
           for (Path pathName : pathNames) {
@@ -2152,13 +2153,14 @@ public class HStore implements Store, HeapSize, 
StoreConfigInformation, Propagat
     }
   }
 
-  public StoreFlushContext createFlushContext(long cacheFlushId) {
-    return new StoreFlusherImpl(cacheFlushId);
+  public StoreFlushContext createFlushContext(long cacheFlushId, 
FlushLifeCycleTracker tracker) {
+    return new StoreFlusherImpl(cacheFlushId, tracker);
   }
 
   private final class StoreFlusherImpl implements StoreFlushContext {
 
-    private long cacheFlushSeqNum;
+    private final FlushLifeCycleTracker tracker;
+    private final long cacheFlushSeqNum;
     private MemStoreSnapshot snapshot;
     private List<Path> tempFiles;
     private List<Path> committedFiles;
@@ -2166,8 +2168,9 @@ public class HStore implements Store, HeapSize, 
StoreConfigInformation, Propagat
     private long cacheFlushSize;
     private long outputFileSize;
 
-    private StoreFlusherImpl(long cacheFlushSeqNum) {
+    private StoreFlusherImpl(long cacheFlushSeqNum, FlushLifeCycleTracker 
tracker) {
       this.cacheFlushSeqNum = cacheFlushSeqNum;
+      this.tracker = tracker;
     }
 
     /**
@@ -2188,7 +2191,8 @@ public class HStore implements Store, HeapSize, 
StoreConfigInformation, Propagat
       RegionServerServices rsService = region.getRegionServerServices();
       ThroughputController throughputController =
           rsService == null ? null : rsService.getFlushThroughputController();
-      tempFiles = HStore.this.flushCache(cacheFlushSeqNum, snapshot, status, 
throughputController);
+      tempFiles =
+          HStore.this.flushCache(cacheFlushSeqNum, snapshot, status, 
throughputController, tracker);
     }
 
     @Override
@@ -2220,7 +2224,7 @@ public class HStore implements Store, HeapSize, 
StoreConfigInformation, Propagat
 
       for (HStoreFile sf : storeFiles) {
         if (HStore.this.getCoprocessorHost() != null) {
-          HStore.this.getCoprocessorHost().postFlush(HStore.this, sf);
+          HStore.this.getCoprocessorHost().postFlush(HStore.this, sf, tracker);
         }
         committedFiles.add(sf.getPath());
       }

http://git-wip-us.apache.org/repos/asf/hbase/blob/ca79a915/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/LogRoller.java
----------------------------------------------------------------------
diff --git 
a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/LogRoller.java
 
b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/LogRoller.java
index 20b6c5f..dbdb27a 100644
--- 
a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/LogRoller.java
+++ 
b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/LogRoller.java
@@ -210,7 +210,7 @@ public class LogRoller extends HasThread implements 
Closeable {
       requester = this.services.getFlushRequester();
       if (requester != null) {
         // force flushing all stores to clean old logs
-        requester.requestFlush(r, true);
+        requester.requestFlush(r, true, FlushLifeCycleTracker.DUMMY);
         scheduled = true;
       }
     }

http://git-wip-us.apache.org/repos/asf/hbase/blob/ca79a915/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/MemStoreFlusher.java
----------------------------------------------------------------------
diff --git 
a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/MemStoreFlusher.java
 
b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/MemStoreFlusher.java
index 82390bd..ae4c8eb 100644
--- 
a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/MemStoreFlusher.java
+++ 
b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/MemStoreFlusher.java
@@ -18,8 +18,6 @@
  */
 package org.apache.hadoop.hbase.regionserver;
 
-import static org.apache.hadoop.util.StringUtils.humanReadableInt;
-
 import java.io.IOException;
 import java.lang.Thread.UncaughtExceptionHandler;
 import java.util.ArrayList;
@@ -44,20 +42,20 @@ import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.hbase.DroppedSnapshotException;
 import org.apache.hadoop.hbase.HConstants;
-import org.apache.yetus.audience.InterfaceAudience;
 import org.apache.hadoop.hbase.client.RegionReplicaUtil;
 import org.apache.hadoop.hbase.regionserver.HRegion.FlushResult;
-import org.apache.hadoop.hbase.shaded.com.google.common.base.Preconditions;
 import org.apache.hadoop.hbase.util.Bytes;
 import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
 import org.apache.hadoop.hbase.util.HasThread;
 import org.apache.hadoop.hbase.util.ServerRegionReplicaUtil;
 import org.apache.hadoop.hbase.util.Threads;
 import org.apache.hadoop.ipc.RemoteException;
-import org.apache.hadoop.util.StringUtils;
 import org.apache.hadoop.util.StringUtils.TraditionalBinaryPrefix;
 import org.apache.htrace.Trace;
 import org.apache.htrace.TraceScope;
+import org.apache.yetus.audience.InterfaceAudience;
+
+import org.apache.hadoop.hbase.shaded.com.google.common.base.Preconditions;
 
 /**
  * Thread that flushes cache on request
@@ -183,12 +181,12 @@ class MemStoreFlusher implements FlushRequester {
            
ServerRegionReplicaUtil.isRegionReplicaStoreFileRefreshEnabled(conf) &&
            (bestRegionReplica.getMemStoreSize()
                > secondaryMultiplier * regionToFlush.getMemStoreSize()))) {
-        LOG.info("Refreshing storefiles of region " + bestRegionReplica
-            + " due to global heap pressure. Total memstore datasize="
-            + StringUtils
-                
.humanReadableInt(server.getRegionServerAccounting().getGlobalMemStoreDataSize())
-            + " memstore heap size=" + StringUtils.humanReadableInt(
-                
server.getRegionServerAccounting().getGlobalMemStoreHeapSize()));
+        LOG.info("Refreshing storefiles of region " + bestRegionReplica +
+            " due to global heap pressure. Total memstore datasize=" +
+            TraditionalBinaryPrefix.long2String(
+              server.getRegionServerAccounting().getGlobalMemStoreDataSize(), 
"", 1) +
+            " memstore heap size=" + TraditionalBinaryPrefix.long2String(
+              server.getRegionServerAccounting().getGlobalMemStoreHeapSize(), 
"", 1));
         flushedOne = refreshStoreFilesAndReclaimMemory(bestRegionReplica);
         if (!flushedOne) {
           LOG.info("Excluding secondary region " + bestRegionReplica +
@@ -196,12 +194,13 @@ class MemStoreFlusher implements FlushRequester {
           excludedRegions.add(bestRegionReplica);
         }
       } else {
-        LOG.info("Flush of region " + regionToFlush + " due to global heap 
pressure. "
-            + "Total Memstore size="
-            + 
humanReadableInt(server.getRegionServerAccounting().getGlobalMemStoreDataSize())
-            + ", Region memstore size="
-            + humanReadableInt(regionToFlush.getMemStoreSize()));
-        flushedOne = flushRegion(regionToFlush, true, false);
+        LOG.info("Flush of region " + regionToFlush + " due to global heap 
pressure. " +
+            "Total Memstore size=" +
+            TraditionalBinaryPrefix.long2String(
+              server.getRegionServerAccounting().getGlobalMemStoreDataSize(), 
"", 1) +
+            ", Region memstore size=" +
+            
TraditionalBinaryPrefix.long2String(regionToFlush.getMemStoreSize(), "", 1));
+        flushedOne = flushRegion(regionToFlush, true, false, 
FlushLifeCycleTracker.DUMMY);
 
         if (!flushedOne) {
           LOG.info("Excluding unflushable region " + regionToFlush +
@@ -348,15 +347,17 @@ class MemStoreFlusher implements FlushRequester {
   }
 
   @Override
-  public void requestFlush(HRegion r, boolean forceFlushAllStores) {
+  public void requestFlush(HRegion r, boolean forceFlushAllStores, 
FlushLifeCycleTracker tracker) {
     r.incrementFlushesQueuedCount();
     synchronized (regionsInQueue) {
       if (!regionsInQueue.containsKey(r)) {
         // This entry has no delay so it will be added at the top of the flush
-        // queue.  It'll come out near immediately.
-        FlushRegionEntry fqe = new FlushRegionEntry(r, forceFlushAllStores);
+        // queue. It'll come out near immediately.
+        FlushRegionEntry fqe = new FlushRegionEntry(r, forceFlushAllStores, 
tracker);
         this.regionsInQueue.put(r, fqe);
         this.flushQueue.add(fqe);
+      } else {
+        tracker.notExecuted("Flush already requested on " + r);
       }
     }
   }
@@ -367,7 +368,8 @@ class MemStoreFlusher implements FlushRequester {
     synchronized (regionsInQueue) {
       if (!regionsInQueue.containsKey(r)) {
         // This entry has some delay
-        FlushRegionEntry fqe = new FlushRegionEntry(r, forceFlushAllStores);
+        FlushRegionEntry fqe =
+            new FlushRegionEntry(r, forceFlushAllStores, 
FlushLifeCycleTracker.DUMMY);
         fqe.requeue(delay);
         this.regionsInQueue.put(r, fqe);
         this.flushQueue.add(fqe);
@@ -463,7 +465,7 @@ class MemStoreFlusher implements FlushRequester {
         return true;
       }
     }
-    return flushRegion(region, false, fqe.isForceFlushAllStores());
+    return flushRegion(region, false, fqe.isForceFlushAllStores(), 
fqe.getTracker());
   }
 
   /**
@@ -478,22 +480,23 @@ class MemStoreFlusher implements FlushRequester {
    * false, there will be accompanying log messages explaining why the region 
was
    * not flushed.
    */
-  private boolean flushRegion(final HRegion region, final boolean 
emergencyFlush,
-      boolean forceFlushAllStores) {
+  private boolean flushRegion(HRegion region, boolean emergencyFlush, boolean 
forceFlushAllStores,
+      FlushLifeCycleTracker tracker) {
     synchronized (this.regionsInQueue) {
       FlushRegionEntry fqe = this.regionsInQueue.remove(region);
       // Use the start time of the FlushRegionEntry if available
       if (fqe != null && emergencyFlush) {
-        // Need to remove from region from delay queue.  When NOT an
+        // Need to remove from region from delay queue. When NOT an
         // emergencyFlush, then item was removed via a flushQueue.poll.
         flushQueue.remove(fqe);
       }
     }
 
+    tracker.beforeExecution();
     lock.readLock().lock();
     try {
       notifyFlushRequest(region, emergencyFlush);
-      FlushResult flushResult = region.flush(forceFlushAllStores);
+      FlushResult flushResult = region.flushcache(forceFlushAllStores, false, 
tracker);
       boolean shouldCompact = flushResult.isCompactionNeeded();
       // We just want to check the size
       boolean shouldSplit = region.checkSplit() != null;
@@ -523,6 +526,7 @@ class MemStoreFlusher implements FlushRequester {
     } finally {
       lock.readLock().unlock();
       wakeUpIfBlocking();
+      tracker.afterExecution();
     }
     return true;
   }
@@ -732,13 +736,16 @@ class MemStoreFlusher implements FlushRequester {
     private long whenToExpire;
     private int requeueCount = 0;
 
-    private boolean forceFlushAllStores;
+    private final boolean forceFlushAllStores;
+
+    private final FlushLifeCycleTracker tracker;
 
-    FlushRegionEntry(final HRegion r, boolean forceFlushAllStores) {
+    FlushRegionEntry(final HRegion r, boolean forceFlushAllStores, 
FlushLifeCycleTracker tracker) {
       this.region = r;
       this.createTime = EnvironmentEdgeManager.currentTime();
       this.whenToExpire = this.createTime;
       this.forceFlushAllStores = forceFlushAllStores;
+      this.tracker = tracker;
     }
 
     /**
@@ -764,6 +771,10 @@ class MemStoreFlusher implements FlushRequester {
       return forceFlushAllStores;
     }
 
+    public FlushLifeCycleTracker getTracker() {
+      return tracker;
+    }
+
     /**
      * @param when When to expire, when to come up out of the queue.
      * Specify in milliseconds.  This method adds 
EnvironmentEdgeManager.currentTime()

http://git-wip-us.apache.org/repos/asf/hbase/blob/ca79a915/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RSRpcServices.java
----------------------------------------------------------------------
diff --git 
a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RSRpcServices.java
 
b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RSRpcServices.java
index 28f73aa..5d450cc 100644
--- 
a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RSRpcServices.java
+++ 
b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RSRpcServices.java
@@ -1611,7 +1611,8 @@ public class RSRpcServices implements 
HBaseRPCErrorHandler,
         boolean writeFlushWalMarker =  request.hasWriteFlushWalMarker() ?
             request.getWriteFlushWalMarker() : false;
         // Go behind the curtain so we can manage writing of the flush WAL 
marker
-        HRegion.FlushResultImpl flushResult = region.flushcache(true, 
writeFlushWalMarker);
+        HRegion.FlushResultImpl flushResult =
+            region.flushcache(true, writeFlushWalMarker, 
FlushLifeCycleTracker.DUMMY);
         boolean compactionNeeded = flushResult.isCompactionNeeded();
         if (compactionNeeded) {
           regionServer.compactSplitThread.requestSystemCompaction(region,

http://git-wip-us.apache.org/repos/asf/hbase/blob/ca79a915/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/Region.java
----------------------------------------------------------------------
diff --git 
a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/Region.java 
b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/Region.java
index 0c93ed1..c0827cb 100644
--- 
a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/Region.java
+++ 
b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/Region.java
@@ -468,4 +468,9 @@ public interface Region extends ConfigurationObserver {
    */
   void requestCompaction(byte[] family, String why, int priority, boolean 
major,
       CompactionLifeCycleTracker tracker) throws IOException;
+
+  /**
+   * Request flush on this region.
+   */
+  void requestFlush(FlushLifeCycleTracker tracker) throws IOException;
 }

http://git-wip-us.apache.org/repos/asf/hbase/blob/ca79a915/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RegionCoprocessorHost.java
----------------------------------------------------------------------
diff --git 
a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RegionCoprocessorHost.java
 
b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RegionCoprocessorHost.java
index 42d7ac9..e25b090 100644
--- 
a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RegionCoprocessorHost.java
+++ 
b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RegionCoprocessorHost.java
@@ -669,13 +669,13 @@ public class RegionCoprocessorHost
    * Invoked before a memstore flush
    * @throws IOException
    */
-  public InternalScanner preFlush(HStore store, final InternalScanner scanner)
-      throws IOException {
-    return execOperationWithResult(false, scanner, 
coprocEnvironments.isEmpty() ? null :
-        new ObserverOperationWithResult<RegionObserver, 
InternalScanner>(regionObserverGetter) {
+  public InternalScanner preFlush(HStore store, InternalScanner scanner,
+      FlushLifeCycleTracker tracker) throws IOException {
+    return execOperationWithResult(false, scanner, 
coprocEnvironments.isEmpty() ? null
+        : new ObserverOperationWithResult<RegionObserver, 
InternalScanner>(regionObserverGetter) {
           @Override
           public InternalScanner call(RegionObserver observer) throws 
IOException {
-            return observer.preFlush(this, store, getResult());
+            return observer.preFlush(this, store, getResult(), tracker);
           }
         });
   }
@@ -684,11 +684,11 @@ public class RegionCoprocessorHost
    * Invoked before a memstore flush
    * @throws IOException
    */
-  public void preFlush() throws IOException {
+  public void preFlush(FlushLifeCycleTracker tracker) throws IOException {
     execOperation(coprocEnvironments.isEmpty() ? null : new 
RegionObserverOperation() {
       @Override
       public void call(RegionObserver observer) throws IOException {
-        observer.preFlush(this);
+        observer.preFlush(this, tracker);
       }
     });
   }
@@ -697,11 +697,11 @@ public class RegionCoprocessorHost
    * Invoked after a memstore flush
    * @throws IOException
    */
-  public void postFlush() throws IOException {
+  public void postFlush(FlushLifeCycleTracker tracker) throws IOException {
     execOperation(coprocEnvironments.isEmpty() ? null : new 
RegionObserverOperation() {
       @Override
       public void call(RegionObserver observer) throws IOException {
-        observer.postFlush(this);
+        observer.postFlush(this, tracker);
       }
     });
   }
@@ -710,11 +710,12 @@ public class RegionCoprocessorHost
    * Invoked after a memstore flush
    * @throws IOException
    */
-  public void postFlush(final HStore store, final HStoreFile storeFile) throws 
IOException {
+  public void postFlush(HStore store, HStoreFile storeFile, 
FlushLifeCycleTracker tracker)
+      throws IOException {
     execOperation(coprocEnvironments.isEmpty() ? null : new 
RegionObserverOperation() {
       @Override
       public void call(RegionObserver observer) throws IOException {
-        observer.postFlush(this, store, storeFile);
+        observer.postFlush(this, store, storeFile, tracker);
       }
     });
   }

http://git-wip-us.apache.org/repos/asf/hbase/blob/ca79a915/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StoreFlusher.java
----------------------------------------------------------------------
diff --git 
a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StoreFlusher.java
 
b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StoreFlusher.java
index 8fde7d5..b0bff10 100644
--- 
a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StoreFlusher.java
+++ 
b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StoreFlusher.java
@@ -57,7 +57,8 @@ abstract class StoreFlusher {
    * @return List of files written. Can be empty; must not be null.
    */
   public abstract List<Path> flushSnapshot(MemStoreSnapshot snapshot, long 
cacheFlushSeqNum,
-      MonitoredTask status, ThroughputController throughputController) throws 
IOException;
+      MonitoredTask status, ThroughputController throughputController,
+      FlushLifeCycleTracker tracker) throws IOException;
 
   protected void finalizeWriter(StoreFileWriter writer, long cacheFlushSeqNum,
       MonitoredTask status) throws IOException {
@@ -77,15 +78,15 @@ abstract class StoreFlusher {
    * @param smallestReadPoint
    * @return The scanner; null if coprocessor is canceling the flush.
    */
-  protected InternalScanner createScanner(List<KeyValueScanner> 
snapshotScanners,
-      long smallestReadPoint) throws IOException {
+  protected final InternalScanner createScanner(List<KeyValueScanner> 
snapshotScanners,
+      long smallestReadPoint, FlushLifeCycleTracker tracker) throws 
IOException {
     InternalScanner scanner =
         new StoreScanner(store, store.getScanInfo(), OptionalInt.empty(), 
snapshotScanners,
             ScanType.COMPACT_RETAIN_DELETES, smallestReadPoint, 
HConstants.OLDEST_TIMESTAMP);
     assert scanner != null;
     if (store.getCoprocessorHost() != null) {
       try {
-        return store.getCoprocessorHost().preFlush(store, scanner);
+        return store.getCoprocessorHost().preFlush(store, scanner, tracker);
       } catch (IOException ioe) {
         scanner.close();
         throw ioe;

http://git-wip-us.apache.org/repos/asf/hbase/blob/ca79a915/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StripeStoreFlusher.java
----------------------------------------------------------------------
diff --git 
a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StripeStoreFlusher.java
 
b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StripeStoreFlusher.java
index c858f8f..259b333 100644
--- 
a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StripeStoreFlusher.java
+++ 
b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StripeStoreFlusher.java
@@ -56,13 +56,14 @@ public class StripeStoreFlusher extends StoreFlusher {
 
   @Override
   public List<Path> flushSnapshot(MemStoreSnapshot snapshot, long 
cacheFlushSeqNum,
-      MonitoredTask status, ThroughputController throughputController) throws 
IOException {
+      MonitoredTask status, ThroughputController throughputController,
+      FlushLifeCycleTracker tracker) throws IOException {
     List<Path> result = new ArrayList<>();
     int cellsCount = snapshot.getCellsCount();
     if (cellsCount == 0) return result; // don't flush if there are no entries
 
     long smallestReadPoint = store.getSmallestReadPoint();
-    InternalScanner scanner = createScanner(snapshot.getScanners(), 
smallestReadPoint);
+    InternalScanner scanner = createScanner(snapshot.getScanners(), 
smallestReadPoint, tracker);
     if (scanner == null) {
       return result; // NULL scanner returned from coprocessor hooks means 
skip normal processing
     }

http://git-wip-us.apache.org/repos/asf/hbase/blob/ca79a915/hbase-server/src/main/java/org/apache/hadoop/hbase/security/access/AccessController.java
----------------------------------------------------------------------
diff --git 
a/hbase-server/src/main/java/org/apache/hadoop/hbase/security/access/AccessController.java
 
b/hbase-server/src/main/java/org/apache/hadoop/hbase/security/access/AccessController.java
index be027c5..1d1cf5b 100644
--- 
a/hbase-server/src/main/java/org/apache/hadoop/hbase/security/access/AccessController.java
+++ 
b/hbase-server/src/main/java/org/apache/hadoop/hbase/security/access/AccessController.java
@@ -110,6 +110,7 @@ import 
org.apache.hadoop.hbase.protobuf.generated.AccessControlProtos;
 import 
org.apache.hadoop.hbase.protobuf.generated.AccessControlProtos.AccessControlService;
 import org.apache.hadoop.hbase.quotas.GlobalQuotaSettings;
 import org.apache.hadoop.hbase.regionserver.BloomType;
+import org.apache.hadoop.hbase.regionserver.FlushLifeCycleTracker;
 import org.apache.hadoop.hbase.regionserver.InternalScanner;
 import org.apache.hadoop.hbase.regionserver.MiniBatchOperationInProgress;
 import org.apache.hadoop.hbase.regionserver.Region;
@@ -1592,9 +1593,10 @@ public class AccessController implements 
MasterCoprocessor, RegionCoprocessor,
   }
 
   @Override
-  public void preFlush(ObserverContext<RegionCoprocessorEnvironment> c) throws 
IOException {
+  public void preFlush(ObserverContext<RegionCoprocessorEnvironment> c,
+      FlushLifeCycleTracker tracker) throws IOException {
     requirePermission(getActiveUser(c), "flush", 
getTableName(c.getEnvironment()), null, null,
-        Action.ADMIN, Action.CREATE);
+      Action.ADMIN, Action.CREATE);
   }
 
   @Override

http://git-wip-us.apache.org/repos/asf/hbase/blob/ca79a915/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestMobCloneSnapshotFromClient.java
----------------------------------------------------------------------
diff --git 
a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestMobCloneSnapshotFromClient.java
 
b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestMobCloneSnapshotFromClient.java
index 1745c82..8cbd2a5 100644
--- 
a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestMobCloneSnapshotFromClient.java
+++ 
b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestMobCloneSnapshotFromClient.java
@@ -34,6 +34,7 @@ import 
org.apache.hadoop.hbase.coprocessor.RegionCoprocessorEnvironment;
 import org.apache.hadoop.hbase.coprocessor.RegionObserver;
 import org.apache.hadoop.hbase.master.cleaner.TimeToLiveHFileCleaner;
 import org.apache.hadoop.hbase.mob.MobConstants;
+import org.apache.hadoop.hbase.regionserver.FlushLifeCycleTracker;
 import org.apache.hadoop.hbase.snapshot.MobSnapshotTestingUtils;
 import org.apache.hadoop.hbase.snapshot.SnapshotTestingUtils;
 import org.apache.hadoop.hbase.testclassification.ClientTests;
@@ -147,7 +148,8 @@ public class TestMobCloneSnapshotFromClient extends 
TestCloneSnapshotFromClient
     }
 
     @Override
-    public void preFlush(ObserverContext<RegionCoprocessorEnvironment> e) 
throws IOException {
+    public void preFlush(ObserverContext<RegionCoprocessorEnvironment> e,
+        FlushLifeCycleTracker tracker) throws IOException {
       if (delayFlush) {
         try {
           if (Bytes.compareTo(e.getEnvironment().getRegionInfo().getStartKey(),

http://git-wip-us.apache.org/repos/asf/hbase/blob/ca79a915/hbase-server/src/test/java/org/apache/hadoop/hbase/coprocessor/SimpleRegionObserver.java
----------------------------------------------------------------------
diff --git 
a/hbase-server/src/test/java/org/apache/hadoop/hbase/coprocessor/SimpleRegionObserver.java
 
b/hbase-server/src/test/java/org/apache/hadoop/hbase/coprocessor/SimpleRegionObserver.java
index 71ef484..611d53b 100644
--- 
a/hbase-server/src/test/java/org/apache/hadoop/hbase/coprocessor/SimpleRegionObserver.java
+++ 
b/hbase-server/src/test/java/org/apache/hadoop/hbase/coprocessor/SimpleRegionObserver.java
@@ -51,6 +51,7 @@ import org.apache.hadoop.hbase.filter.ByteArrayComparable;
 import org.apache.hadoop.hbase.io.FSDataInputStreamWrapper;
 import org.apache.hadoop.hbase.io.Reference;
 import org.apache.hadoop.hbase.io.hfile.CacheConfig;
+import org.apache.hadoop.hbase.regionserver.FlushLifeCycleTracker;
 import org.apache.hadoop.hbase.regionserver.InternalScanner;
 import org.apache.hadoop.hbase.regionserver.KeyValueScanner;
 import org.apache.hadoop.hbase.regionserver.MiniBatchOperationInProgress;
@@ -171,14 +172,14 @@ public class SimpleRegionObserver implements 
RegionCoprocessor, RegionObserver {
 
   @Override
   public InternalScanner 
preFlush(ObserverContext<RegionCoprocessorEnvironment> c,
-      Store store, InternalScanner scanner) throws IOException {
+      Store store, InternalScanner scanner, FlushLifeCycleTracker tracker) 
throws IOException {
     ctPreFlush.incrementAndGet();
     return scanner;
   }
 
   @Override
   public void postFlush(ObserverContext<RegionCoprocessorEnvironment> c,
-      Store store, StoreFile resultFile) throws IOException {
+      Store store, StoreFile resultFile, FlushLifeCycleTracker tracker) throws 
IOException {
     ctPostFlush.incrementAndGet();
     if (throwOnPostFlush.get()){
       throw new IOException("throwOnPostFlush is true in postFlush");

http://git-wip-us.apache.org/repos/asf/hbase/blob/ca79a915/hbase-server/src/test/java/org/apache/hadoop/hbase/coprocessor/TestCoprocessorInterface.java
----------------------------------------------------------------------
diff --git 
a/hbase-server/src/test/java/org/apache/hadoop/hbase/coprocessor/TestCoprocessorInterface.java
 
b/hbase-server/src/test/java/org/apache/hadoop/hbase/coprocessor/TestCoprocessorInterface.java
index 22ecd2f..e0d9f5f 100644
--- 
a/hbase-server/src/test/java/org/apache/hadoop/hbase/coprocessor/TestCoprocessorInterface.java
+++ 
b/hbase-server/src/test/java/org/apache/hadoop/hbase/coprocessor/TestCoprocessorInterface.java
@@ -53,6 +53,7 @@ import org.apache.hadoop.hbase.client.RegionInfo;
 import org.apache.hadoop.hbase.client.RegionInfoBuilder;
 import org.apache.hadoop.hbase.client.Scan;
 import org.apache.hadoop.hbase.regionserver.ChunkCreator;
+import org.apache.hadoop.hbase.regionserver.FlushLifeCycleTracker;
 import org.apache.hadoop.hbase.regionserver.HRegion;
 import org.apache.hadoop.hbase.regionserver.InternalScanner;
 import org.apache.hadoop.hbase.regionserver.MemStoreLABImpl;
@@ -214,12 +215,16 @@ public class TestCoprocessorInterface {
         CompactionRequest request) {
       postCompactCalled = true;
     }
+
     @Override
-    public void preFlush(ObserverContext<RegionCoprocessorEnvironment> e) {
+    public void preFlush(ObserverContext<RegionCoprocessorEnvironment> e,
+        FlushLifeCycleTracker tracker) {
       preFlushCalled = true;
     }
+
     @Override
-    public void postFlush(ObserverContext<RegionCoprocessorEnvironment> e) {
+    public void postFlush(ObserverContext<RegionCoprocessorEnvironment> e,
+        FlushLifeCycleTracker tracker) {
       postFlushCalled = true;
     }
 

http://git-wip-us.apache.org/repos/asf/hbase/blob/ca79a915/hbase-server/src/test/java/org/apache/hadoop/hbase/coprocessor/TestRegionObserverInterface.java
----------------------------------------------------------------------
diff --git 
a/hbase-server/src/test/java/org/apache/hadoop/hbase/coprocessor/TestRegionObserverInterface.java
 
b/hbase-server/src/test/java/org/apache/hadoop/hbase/coprocessor/TestRegionObserverInterface.java
index 3205899..49fc3fd 100644
--- 
a/hbase-server/src/test/java/org/apache/hadoop/hbase/coprocessor/TestRegionObserverInterface.java
+++ 
b/hbase-server/src/test/java/org/apache/hadoop/hbase/coprocessor/TestRegionObserverInterface.java
@@ -64,6 +64,7 @@ import org.apache.hadoop.hbase.io.hfile.CacheConfig;
 import org.apache.hadoop.hbase.io.hfile.HFile;
 import org.apache.hadoop.hbase.io.hfile.HFileContext;
 import org.apache.hadoop.hbase.io.hfile.HFileContextBuilder;
+import org.apache.hadoop.hbase.regionserver.FlushLifeCycleTracker;
 import org.apache.hadoop.hbase.regionserver.HRegion;
 import org.apache.hadoop.hbase.regionserver.InternalScanner;
 import org.apache.hadoop.hbase.regionserver.NoLimitScannerContext;
@@ -465,7 +466,8 @@ public class TestRegionObserverInterface {
     }
 
     @Override
-    public void postFlush(ObserverContext<RegionCoprocessorEnvironment> e) {
+    public void postFlush(ObserverContext<RegionCoprocessorEnvironment> e,
+        FlushLifeCycleTracker tracker) {
       lastFlush = EnvironmentEdgeManager.currentTime();
     }
   }

http://git-wip-us.apache.org/repos/asf/hbase/blob/ca79a915/hbase-server/src/test/java/org/apache/hadoop/hbase/coprocessor/TestRegionObserverScannerOpenHook.java
----------------------------------------------------------------------
diff --git 
a/hbase-server/src/test/java/org/apache/hadoop/hbase/coprocessor/TestRegionObserverScannerOpenHook.java
 
b/hbase-server/src/test/java/org/apache/hadoop/hbase/coprocessor/TestRegionObserverScannerOpenHook.java
index 4d6bfec..9e7c184 100644
--- 
a/hbase-server/src/test/java/org/apache/hadoop/hbase/coprocessor/TestRegionObserverScannerOpenHook.java
+++ 
b/hbase-server/src/test/java/org/apache/hadoop/hbase/coprocessor/TestRegionObserverScannerOpenHook.java
@@ -50,6 +50,7 @@ import org.apache.hadoop.hbase.client.Table;
 import org.apache.hadoop.hbase.client.TableDescriptor;
 import org.apache.hadoop.hbase.filter.FilterBase;
 import org.apache.hadoop.hbase.regionserver.ChunkCreator;
+import org.apache.hadoop.hbase.regionserver.FlushLifeCycleTracker;
 import org.apache.hadoop.hbase.regionserver.HRegion;
 import org.apache.hadoop.hbase.regionserver.HRegionServer;
 import org.apache.hadoop.hbase.regionserver.HStore;
@@ -157,7 +158,7 @@ public class TestRegionObserverScannerOpenHook {
 
     @Override
     public InternalScanner 
preFlush(ObserverContext<RegionCoprocessorEnvironment> c, Store store,
-        InternalScanner scanner) throws IOException {
+        InternalScanner scanner, FlushLifeCycleTracker tracker) throws 
IOException {
       return NO_DATA;
     }
   }

http://git-wip-us.apache.org/repos/asf/hbase/blob/ca79a915/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/NoOpScanPolicyObserver.java
----------------------------------------------------------------------
diff --git 
a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/NoOpScanPolicyObserver.java
 
b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/NoOpScanPolicyObserver.java
index cdad850..f0d9f1a 100644
--- 
a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/NoOpScanPolicyObserver.java
+++ 
b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/NoOpScanPolicyObserver.java
@@ -45,7 +45,7 @@ public class NoOpScanPolicyObserver implements 
RegionCoprocessor, RegionObserver
 
   @Override
   public InternalScanner 
preFlush(ObserverContext<RegionCoprocessorEnvironment> c, Store store,
-      InternalScanner scanner) throws IOException {
+      InternalScanner scanner, FlushLifeCycleTracker tracker) throws 
IOException {
     return new DelegatingInternalScanner(scanner);
   }
 

http://git-wip-us.apache.org/repos/asf/hbase/blob/ca79a915/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestFlushLifeCycleTracker.java
----------------------------------------------------------------------
diff --git 
a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestFlushLifeCycleTracker.java
 
b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestFlushLifeCycleTracker.java
new file mode 100644
index 0000000..6677c18
--- /dev/null
+++ 
b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestFlushLifeCycleTracker.java
@@ -0,0 +1,240 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.regionserver;
+
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.assertNull;
+import static org.junit.Assert.assertSame;
+import static org.junit.Assert.assertTrue;
+
+import java.io.IOException;
+import java.io.InterruptedIOException;
+import java.util.Optional;
+import java.util.concurrent.CountDownLatch;
+
+import org.apache.hadoop.hbase.HBaseTestingUtility;
+import org.apache.hadoop.hbase.TableName;
+import org.apache.hadoop.hbase.client.ColumnFamilyDescriptorBuilder;
+import org.apache.hadoop.hbase.client.Put;
+import org.apache.hadoop.hbase.client.Table;
+import org.apache.hadoop.hbase.client.TableDescriptorBuilder;
+import org.apache.hadoop.hbase.coprocessor.ObserverContext;
+import org.apache.hadoop.hbase.coprocessor.RegionCoprocessor;
+import org.apache.hadoop.hbase.coprocessor.RegionCoprocessorEnvironment;
+import org.apache.hadoop.hbase.coprocessor.RegionObserver;
+import org.apache.hadoop.hbase.testclassification.CoprocessorTests;
+import org.apache.hadoop.hbase.testclassification.MediumTests;
+import org.apache.hadoop.hbase.util.Bytes;
+import org.junit.After;
+import org.junit.AfterClass;
+import org.junit.Before;
+import org.junit.BeforeClass;
+import org.junit.Test;
+import org.junit.experimental.categories.Category;
+
+/**
+ * Confirm that the function of FlushLifeCycleTracker is OK as we do not use 
it in our own code.
+ */
+@Category({ CoprocessorTests.class, MediumTests.class })
+public class TestFlushLifeCycleTracker {
+
+  private static final HBaseTestingUtility UTIL = new HBaseTestingUtility();
+
+  private static final TableName NAME =
+      TableName.valueOf(TestCompactionLifeCycleTracker.class.getSimpleName());
+
+  private static final byte[] CF = Bytes.toBytes("CF");
+
+  private static final byte[] QUALIFIER = Bytes.toBytes("CQ");
+
+  private HRegion region;
+
+  private static FlushLifeCycleTracker TRACKER;
+
+  private static volatile CountDownLatch ARRIVE;
+
+  private static volatile CountDownLatch BLOCK;
+
+  public static final class FlushObserver implements RegionObserver, 
RegionCoprocessor {
+
+    @Override
+    public Optional<RegionObserver> getRegionObserver() {
+      return Optional.of(this);
+    }
+
+    @Override
+    public void preFlush(ObserverContext<RegionCoprocessorEnvironment> c,
+        FlushLifeCycleTracker tracker) throws IOException {
+      if (TRACKER != null) {
+        assertSame(tracker, TRACKER);
+      }
+    }
+
+    @Override
+    public InternalScanner 
preFlush(ObserverContext<RegionCoprocessorEnvironment> c, Store store,
+        InternalScanner scanner, FlushLifeCycleTracker tracker) throws 
IOException {
+      if (TRACKER != null) {
+        assertSame(tracker, TRACKER);
+      }
+      return scanner;
+    }
+
+    @Override
+    public void postFlush(ObserverContext<RegionCoprocessorEnvironment> c,
+        FlushLifeCycleTracker tracker) throws IOException {
+      if (TRACKER != null) {
+        assertSame(tracker, TRACKER);
+      }
+    }
+
+    @Override
+    public void postFlush(ObserverContext<RegionCoprocessorEnvironment> c, 
Store store,
+        StoreFile resultFile, FlushLifeCycleTracker tracker) throws 
IOException {
+      if (TRACKER != null) {
+        assertSame(tracker, TRACKER);
+      }
+      // inject here so we can make a flush request to fail because of we 
already have a flush
+      // ongoing.
+      CountDownLatch arrive = ARRIVE;
+      if (arrive != null) {
+        arrive.countDown();
+      }
+      try {
+        BLOCK.await();
+      } catch (InterruptedException e) {
+        throw new InterruptedIOException();
+      }
+    }
+  }
+
+  private static final class Tracker implements FlushLifeCycleTracker {
+
+    private String reason;
+
+    private boolean beforeExecutionCalled;
+
+    private boolean afterExecutionCalled;
+
+    private boolean completed = false;
+
+    @Override
+    public synchronized void notExecuted(String reason) {
+      this.reason = reason;
+      completed = true;
+      notifyAll();
+    }
+
+    @Override
+    public void beforeExecution() {
+      this.beforeExecutionCalled = true;
+    }
+
+    @Override
+    public synchronized void afterExecution() {
+      this.afterExecutionCalled = true;
+      completed = true;
+      notifyAll();
+    }
+
+    public synchronized void await() throws InterruptedException {
+      while (!completed) {
+        wait();
+      }
+    }
+  }
+
+  @BeforeClass
+  public static void setUpBeforeClass() throws Exception {
+    UTIL.startMiniCluster(3);
+  }
+
+  @AfterClass
+  public static void tearDownAfterClass() throws Exception {
+    UTIL.shutdownMiniCluster();
+  }
+
+  @Before
+  public void setUp() throws IOException {
+    UTIL.getAdmin()
+        .createTable(TableDescriptorBuilder.newBuilder(NAME)
+            .addColumnFamily(ColumnFamilyDescriptorBuilder.of(CF))
+            .addCoprocessor(FlushObserver.class.getName()).build());
+    region = UTIL.getHBaseCluster().getRegions(NAME).get(0);
+  }
+
+  @After
+  public void tearDown() throws IOException {
+    region = null;
+    TRACKER = null;
+    UTIL.deleteTable(NAME);
+  }
+
+  @Test
+  public void test() throws IOException, InterruptedException {
+    try (Table table = UTIL.getConnection().getTable(NAME)) {
+      for (int i = 0; i < 100; i++) {
+        table.put(new Put(Bytes.toBytes(i)).addImmutable(CF, QUALIFIER, 
Bytes.toBytes(i)));
+      }
+    }
+    Tracker tracker = new Tracker();
+    TRACKER = tracker;
+    region.requestFlush(tracker);
+    tracker.await();
+    assertNull(tracker.reason);
+    assertTrue(tracker.beforeExecutionCalled);
+    assertTrue(tracker.afterExecutionCalled);
+
+    // request flush on a region with empty memstore should still success
+    tracker = new Tracker();
+    TRACKER = tracker;
+    region.requestFlush(tracker);
+    tracker.await();
+    assertNull(tracker.reason);
+    assertTrue(tracker.beforeExecutionCalled);
+    assertTrue(tracker.afterExecutionCalled);
+  }
+
+  @Test
+  public void testNotExecuted() throws IOException, InterruptedException {
+    try (Table table = UTIL.getConnection().getTable(NAME)) {
+      for (int i = 0; i < 100; i++) {
+        table.put(new Put(Bytes.toBytes(i)).addImmutable(CF, QUALIFIER, 
Bytes.toBytes(i)));
+      }
+    }
+    // here we may have overlap when calling the CP hooks so we do not assert 
on TRACKER
+    Tracker tracker1 = new Tracker();
+    ARRIVE = new CountDownLatch(1);
+    BLOCK = new CountDownLatch(1);
+    region.requestFlush(tracker1);
+    ARRIVE.await();
+
+    Tracker tracker2 = new Tracker();
+    region.requestFlush(tracker2);
+    tracker2.await();
+    assertNotNull(tracker2.reason);
+    assertFalse(tracker2.beforeExecutionCalled);
+    assertFalse(tracker2.afterExecutionCalled);
+
+    BLOCK.countDown();
+    tracker1.await();
+    assertNull(tracker1.reason);
+    assertTrue(tracker1.beforeExecutionCalled);
+    assertTrue(tracker1.afterExecutionCalled);
+  }
+}

http://git-wip-us.apache.org/repos/asf/hbase/blob/ca79a915/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestFlushRegionEntry.java
----------------------------------------------------------------------
diff --git 
a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestFlushRegionEntry.java
 
b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestFlushRegionEntry.java
index 0eed449..12fdb77 100644
--- 
a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestFlushRegionEntry.java
+++ 
b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestFlushRegionEntry.java
@@ -1,25 +1,34 @@
 /**
- * Licensed to the Apache Software Foundation (ASF) under one or more 
contributor license
- * agreements. See the NOTICE file distributed with this work for additional 
information regarding
- * copyright ownership. The ASF licenses this file to you under the Apache 
License, Version 2.0 (the
- * "License"); you may not use this file except in compliance with the 
License. You may obtain a
- * copy of the License at http://www.apache.org/licenses/LICENSE-2.0 Unless 
required by applicable
- * law or agreed to in writing, software distributed under the License is 
distributed on an "AS IS"
- * BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or 
implied. See the License
- * for the specific language governing permissions and limitations under the 
License.
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
  */
 package org.apache.hadoop.hbase.regionserver;
 
 import static org.junit.Assert.assertEquals;
-import static org.mockito.Mockito.*;
+import static org.mockito.Mockito.doReturn;
+import static org.mockito.Mockito.mock;
 
-import org.apache.hadoop.hbase.HRegionInfo;
 import org.apache.hadoop.hbase.TableName;
-import org.apache.hadoop.hbase.testclassification.RegionServerTests;
+import org.apache.hadoop.hbase.client.RegionInfo;
+import org.apache.hadoop.hbase.client.RegionInfoBuilder;
+import org.apache.hadoop.hbase.regionserver.MemStoreFlusher.FlushRegionEntry;
 import org.apache.hadoop.hbase.testclassification.MediumTests;
+import org.apache.hadoop.hbase.testclassification.RegionServerTests;
 import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
 import org.apache.hadoop.hbase.util.ManualEnvironmentEdge;
-import org.apache.hadoop.hbase.regionserver.MemStoreFlusher.FlushRegionEntry;
 import org.junit.AfterClass;
 import org.junit.BeforeClass;
 import org.junit.Rule;
@@ -27,7 +36,7 @@ import org.junit.Test;
 import org.junit.experimental.categories.Category;
 import org.junit.rules.TestName;
 
-@Category({RegionServerTests.class, MediumTests.class})
+@Category({ RegionServerTests.class, MediumTests.class })
 public class TestFlushRegionEntry {
   @Rule
   public TestName name = new TestName();
@@ -46,15 +55,15 @@ public class TestFlushRegionEntry {
 
   @Test
   public void testFlushRegionEntryEquality() {
-    HRegionInfo hri = new HRegionInfo(1, 
TableName.valueOf(name.getMethodName()), 0);
+    RegionInfo hri = 
RegionInfoBuilder.newBuilder(TableName.valueOf(name.getMethodName()))
+        .setRegionId(1).setReplicaId(0).build();
     HRegion r = mock(HRegion.class);
     doReturn(hri).when(r).getRegionInfo();
 
-    FlushRegionEntry entry = new FlushRegionEntry(r, true);
-    FlushRegionEntry other = new FlushRegionEntry(r, true);
+    FlushRegionEntry entry = new FlushRegionEntry(r, true, 
FlushLifeCycleTracker.DUMMY);
+    FlushRegionEntry other = new FlushRegionEntry(r, true, 
FlushLifeCycleTracker.DUMMY);
 
     assertEquals(entry.hashCode(), other.hashCode());
     assertEquals(entry, other);
   }
-
 }

http://git-wip-us.apache.org/repos/asf/hbase/blob/ca79a915/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestHMobStore.java
----------------------------------------------------------------------
diff --git 
a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestHMobStore.java
 
b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestHMobStore.java
index 38d038f..95efa80 100644
--- 
a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestHMobStore.java
+++ 
b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestHMobStore.java
@@ -482,7 +482,7 @@ public class TestHMobStore {
    * @throws IOException
    */
   private static void flushStore(HMobStore store, long id) throws IOException {
-    StoreFlushContext storeFlushCtx = store.createFlushContext(id);
+    StoreFlushContext storeFlushCtx = store.createFlushContext(id, 
FlushLifeCycleTracker.DUMMY);
     storeFlushCtx.prepare();
     storeFlushCtx.flushCache(Mockito.mock(MonitoredTask.class));
     storeFlushCtx.commit(Mockito.mock(MonitoredTask.class));

http://git-wip-us.apache.org/repos/asf/hbase/blob/ca79a915/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestHRegion.java
----------------------------------------------------------------------
diff --git 
a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestHRegion.java
 
b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestHRegion.java
index 24e42bb..41bd997 100644
--- 
a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestHRegion.java
+++ 
b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestHRegion.java
@@ -293,7 +293,7 @@ public class TestHRegion {
     put.addColumn(COLUMN_FAMILY_BYTES, null, value);
     // First put something in current memstore, which will be in snapshot 
after flusher.prepare()
     region.put(put);
-    StoreFlushContext storeFlushCtx = store.createFlushContext(12345);
+    StoreFlushContext storeFlushCtx = store.createFlushContext(12345, 
FlushLifeCycleTracker.DUMMY);
     storeFlushCtx.prepare();
     // Second put something in current memstore
     put.addColumn(COLUMN_FAMILY_BYTES, Bytes.toBytes("abc"), value);
@@ -337,7 +337,7 @@ public class TestHRegion {
     HStore store = region.getStore(COLUMN_FAMILY_BYTES);
     // Get some random bytes.
     byte [] value = Bytes.toBytes(method);
-    faultyLog.setStoreFlushCtx(store.createFlushContext(12345));
+    faultyLog.setStoreFlushCtx(store.createFlushContext(12345, 
FlushLifeCycleTracker.DUMMY));
 
     Put put = new Put(value);
     put.addColumn(COLUMN_FAMILY_BYTES, Bytes.toBytes("abc"), value);
@@ -400,8 +400,8 @@ public class TestHRegion {
     // save normalCPHost and replaced by mockedCPHost, which will cancel flush 
requests
     RegionCoprocessorHost normalCPHost = region.getCoprocessorHost();
     RegionCoprocessorHost mockedCPHost = 
Mockito.mock(RegionCoprocessorHost.class);
-    when(mockedCPHost.preFlush(Mockito.isA(HStore.class), 
Mockito.isA(InternalScanner.class))).
-      thenReturn(null);
+    when(mockedCPHost.preFlush(Mockito.isA(HStore.class), 
Mockito.isA(InternalScanner.class),
+      Mockito.isA(FlushLifeCycleTracker.class))).thenReturn(null);
     region.setCoprocessorHost(mockedCPHost);
     region.put(put);
     region.flush(true);
@@ -567,7 +567,8 @@ public class TestHRegion {
           region.put(p1);
           // Manufacture an outstanding snapshot -- fake a failed flush by 
doing prepare step only.
           HStore store = region.getStore(COLUMN_FAMILY_BYTES);
-          StoreFlushContext storeFlushCtx = store.createFlushContext(12345);
+          StoreFlushContext storeFlushCtx =
+              store.createFlushContext(12345, FlushLifeCycleTracker.DUMMY);
           storeFlushCtx.prepare();
           // Now add two entries to the foreground memstore.
           Put p2 = new Put(row);
@@ -5626,7 +5627,7 @@ public class TestHRegion {
       Put put = new Put(Bytes.toBytes("19998"));
       put.addColumn(cf1, col, Bytes.toBytes("val"));
       region.put(put);
-      region.flushcache(true, true);
+      region.flushcache(true, true, FlushLifeCycleTracker.DUMMY);
       Put put2 = new Put(Bytes.toBytes("19997"));
       put2.addColumn(cf1, col, Bytes.toBytes("val"));
       region.put(put2);
@@ -5642,7 +5643,7 @@ public class TestHRegion {
         p.addColumn(cf1, col, Bytes.toBytes("" + i));
         region.put(p);
       }
-      region.flushcache(true, true);
+      region.flushcache(true, true, FlushLifeCycleTracker.DUMMY);
 
       // create one memstore contains many rows will be skipped
       // to check MemStoreScanner.seekToPreviousRow
@@ -5689,7 +5690,7 @@ public class TestHRegion {
       RegionScanner scanner = region.getScanner(scan);
 
       // flush the cache. This will reset the store scanner
-      region.flushcache(true, true);
+      region.flushcache(true, true, FlushLifeCycleTracker.DUMMY);
 
       // create one memstore contains many rows will be skipped
       // to check MemStoreScanner.seekToPreviousRow

http://git-wip-us.apache.org/repos/asf/hbase/blob/ca79a915/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestHRegionReplayEvents.java
----------------------------------------------------------------------
diff --git 
a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestHRegionReplayEvents.java
 
b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestHRegionReplayEvents.java
index 22b5064..71a9f5c 100644
--- 
a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestHRegionReplayEvents.java
+++ 
b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestHRegionReplayEvents.java
@@ -1207,13 +1207,15 @@ public class TestHRegionReplayEvents {
   @Test
   public void testWriteFlushRequestMarker() throws IOException {
     // primary region is empty at this point. Request a flush with 
writeFlushRequestWalMarker=false
-    FlushResultImpl result = 
(FlushResultImpl)((HRegion)primaryRegion).flushcache(true, false);
+    FlushResultImpl result = (FlushResultImpl) ((HRegion) 
primaryRegion).flushcache(true, false,
+      FlushLifeCycleTracker.DUMMY);
     assertNotNull(result);
     assertEquals(result.result, 
FlushResultImpl.Result.CANNOT_FLUSH_MEMSTORE_EMPTY);
     assertFalse(result.wroteFlushWalMarker);
 
     // request flush again, but this time with writeFlushRequestWalMarker = 
true
-    result = (FlushResultImpl)((HRegion)primaryRegion).flushcache(true, true);
+    result = (FlushResultImpl) ((HRegion) primaryRegion).flushcache(true, true,
+      FlushLifeCycleTracker.DUMMY);
     assertNotNull(result);
     assertEquals(result.result, 
FlushResultImpl.Result.CANNOT_FLUSH_MEMSTORE_EMPTY);
     assertTrue(result.wroteFlushWalMarker);
@@ -1248,7 +1250,7 @@ public class TestHRegionReplayEvents {
 
     // Test case 1: Test that replaying CANNOT_FLUSH request marker assuming 
this came from
     // triggered flush restores readsEnabled
-    primaryRegion.flushcache(true, true);
+    primaryRegion.flushcache(true, true, FlushLifeCycleTracker.DUMMY);
     reader = createWALReaderForPrimary();
     while (true) {
       WAL.Entry entry = reader.next();

http://git-wip-us.apache.org/repos/asf/hbase/blob/ca79a915/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestHStore.java
----------------------------------------------------------------------
diff --git 
a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestHStore.java
 
b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestHStore.java
index b9054f4..1382603 100644
--- 
a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestHStore.java
+++ 
b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestHStore.java
@@ -769,7 +769,7 @@ public class TestHStore {
   }
 
   private static void flushStore(HStore store, long id) throws IOException {
-    StoreFlushContext storeFlushCtx = store.createFlushContext(id);
+    StoreFlushContext storeFlushCtx = store.createFlushContext(id, 
FlushLifeCycleTracker.DUMMY);
     storeFlushCtx.prepare();
     storeFlushCtx.flushCache(Mockito.mock(MonitoredTask.class));
     storeFlushCtx.commit(Mockito.mock(MonitoredTask.class));
@@ -1081,7 +1081,7 @@ public class TestHStore {
       seqId = Math.max(seqId, c.getSequenceId());
     }
     inputCellsBeforeSnapshot.forEach(c -> store.add(c, null));
-    StoreFlushContext storeFlushCtx = store.createFlushContext(id++);
+    StoreFlushContext storeFlushCtx = store.createFlushContext(id++, 
FlushLifeCycleTracker.DUMMY);
     storeFlushCtx.prepare();
     inputCellsAfterSnapshot.forEach(c -> store.add(c, null));
     int numberOfMemScannersBeforeFlush = inputCellsAfterSnapshot.isEmpty() ? 1 
: 2;
@@ -1287,7 +1287,7 @@ public class TestHStore {
     quals.add(qf1);
     quals.add(qf2);
     quals.add(qf3);
-    StoreFlushContext storeFlushCtx = store.createFlushContext(id++);
+    StoreFlushContext storeFlushCtx = store.createFlushContext(id++, 
FlushLifeCycleTracker.DUMMY);
     MyCompactingMemStore.START_TEST.set(true);
     Runnable flush = () -> {
       // this is blocked until we create first scanner from pipeline and 
snapshot -- phase (1/5)
@@ -1363,7 +1363,7 @@ public class TestHStore {
     myStore.add(createCell(qf3, ts, seqId, oldValue), memStoreSizing);
     long snapshotId = id++;
     // push older data into snapshot -- phase (1/4)
-    StoreFlushContext storeFlushCtx = store.createFlushContext(snapshotId);
+    StoreFlushContext storeFlushCtx = store.createFlushContext(snapshotId, 
FlushLifeCycleTracker.DUMMY);
     storeFlushCtx.prepare();
 
     // insert current data into active -- phase (2/4)
@@ -1475,7 +1475,7 @@ public class TestHStore {
     store.add(createCell(qf2, ts, seqId, value), memStoreSizing);
     store.add(createCell(qf3, ts, seqId, value), memStoreSizing);
     assertEquals(1, 
MyCompactingMemStoreWithCustomCompactor.RUNNER_COUNT.get());
-    StoreFlushContext storeFlushCtx = store.createFlushContext(id++);
+    StoreFlushContext storeFlushCtx = store.createFlushContext(id++, 
FlushLifeCycleTracker.DUMMY);
     storeFlushCtx.prepare();
     // This shouldn't invoke another in-memory flush because the first 
compactor thread
     // hasn't accomplished the in-memory compaction.

Reply via email to