HBASE-18358 Backport HBASE-18099 'FlushSnapshotSubprocedure should wait for concurrent Region#flush() to finish'
Project: http://git-wip-us.apache.org/repos/asf/hbase/repo Commit: http://git-wip-us.apache.org/repos/asf/hbase/commit/c0f743e4 Tree: http://git-wip-us.apache.org/repos/asf/hbase/tree/c0f743e4 Diff: http://git-wip-us.apache.org/repos/asf/hbase/diff/c0f743e4 Branch: refs/heads/HBASE-14070.HLC Commit: c0f743e44f3e9ec8095d983215555bf8558a6ec1 Parents: cc4301c Author: tedyu <yuzhih...@gmail.com> Authored: Tue Jul 11 17:26:22 2017 -0700 Committer: tedyu <yuzhih...@gmail.com> Committed: Tue Jul 11 17:26:22 2017 -0700 ---------------------------------------------------------------------- .../snapshot/FlushSnapshotSubprocedure.java | 30 ++++++++++++++++---- 1 file changed, 25 insertions(+), 5 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/hbase/blob/c0f743e4/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/snapshot/FlushSnapshotSubprocedure.java ---------------------------------------------------------------------- diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/snapshot/FlushSnapshotSubprocedure.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/snapshot/FlushSnapshotSubprocedure.java index 22df895..b30d622 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/snapshot/FlushSnapshotSubprocedure.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/snapshot/FlushSnapshotSubprocedure.java @@ -17,6 +17,7 @@ */ package org.apache.hadoop.hbase.regionserver.snapshot; +import java.io.IOException; import java.util.List; import java.util.concurrent.Callable; @@ -24,6 +25,7 @@ import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.apache.hadoop.hbase.classification.InterfaceAudience; import org.apache.hadoop.hbase.classification.InterfaceStability; +import org.apache.hadoop.hbase.client.IsolationLevel; import org.apache.hadoop.hbase.errorhandling.ForeignException; import org.apache.hadoop.hbase.errorhandling.ForeignExceptionDispatcher; import org.apache.hadoop.hbase.procedure.ProcedureMember; @@ -52,6 +54,9 @@ public class FlushSnapshotSubprocedure extends Subprocedure { private final SnapshotSubprocedurePool taskManager; private boolean snapshotSkipFlush = false; + // the maximum number of attempts we flush + final static int MAX_RETRIES = 3; + public FlushSnapshotSubprocedure(ProcedureMember member, ForeignExceptionDispatcher errorListener, long wakeFrequency, long timeout, List<Region> regions, SnapshotDescription snapshot, @@ -96,11 +101,26 @@ public class FlushSnapshotSubprocedure extends Subprocedure { LOG.debug("take snapshot without flush memstore first"); } else { LOG.debug("Flush Snapshotting region " + region.toString() + " started..."); - FlushResult res = region.flush(true); - if (res.getResult() == FlushResult.Result.CANNOT_FLUSH) { - // CANNOT_FLUSH may mean that a flush is already on-going - // we need to wait for that flush to complete - region.waitForFlushes(); + boolean succeeded = false; + long readPt = region.getReadpoint(IsolationLevel.READ_COMMITTED); + for (int i = 0; i < MAX_RETRIES; i++) { + FlushResult res = region.flush(true); + if (res.getResult() == FlushResult.Result.CANNOT_FLUSH) { + // CANNOT_FLUSH may mean that a flush is already on-going + // we need to wait for that flush to complete + region.waitForFlushes(); + if (region.getMaxFlushedSeqId() >= readPt) { + // writes at the start of the snapshot have been persisted + succeeded = true; + break; + } + } else { + succeeded = true; + break; + } + } + if (!succeeded) { + throw new IOException("Unable to complete flush after " + MAX_RETRIES + " attempts"); } } ((HRegion)region).addRegionToSnapshot(snapshot, monitor);