Fix archiving of pv2 WAL files
Project: http://git-wip-us.apache.org/repos/asf/hbase/repo Commit: http://git-wip-us.apache.org/repos/asf/hbase/commit/a8dfa377 Tree: http://git-wip-us.apache.org/repos/asf/hbase/tree/a8dfa377 Diff: http://git-wip-us.apache.org/repos/asf/hbase/diff/a8dfa377 Branch: refs/heads/HBASE-14614 Commit: a8dfa377caedcd81a55fed7169e18fa062b5936f Parents: 95c6180 Author: Michael Stack <st...@apache.org> Authored: Fri May 12 13:02:32 2017 -0700 Committer: Michael Stack <st...@apache.org> Committed: Tue May 23 00:33:03 2017 -0700 ---------------------------------------------------------------------- .../procedure2/store/wal/ProcedureWALFile.java | 28 ++++++++++---------- .../procedure2/store/wal/WALProcedureStore.java | 28 +++++++++++++------- .../org/apache/hadoop/hbase/master/HMaster.java | 4 ++- .../assignment/TestAssignmentManager.java | 2 +- 4 files changed, 36 insertions(+), 26 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/hbase/blob/a8dfa377/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/store/wal/ProcedureWALFile.java ---------------------------------------------------------------------- diff --git a/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/store/wal/ProcedureWALFile.java b/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/store/wal/ProcedureWALFile.java index 2221cfc..42abe8f 100644 --- a/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/store/wal/ProcedureWALFile.java +++ b/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/store/wal/ProcedureWALFile.java @@ -26,7 +26,6 @@ import org.apache.hadoop.fs.FSDataInputStream; import org.apache.hadoop.fs.FileStatus; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; -import org.apache.hadoop.hbase.HConstants; import org.apache.hadoop.hbase.classification.InterfaceAudience; import org.apache.hadoop.hbase.classification.InterfaceStability; import org.apache.hadoop.hbase.procedure2.store.ProcedureStoreTracker; @@ -156,22 +155,23 @@ public class ProcedureWALFile implements Comparable<ProcedureWALFile> { this.logSize += size; } - public void removeFile() throws IOException { + public void removeFile(final Path walArchiveDir) throws IOException { close(); - // TODO: FIX THIS. MAKE THIS ARCHIVE FORMAL. - Path archiveDir = - new Path(logFile.getParent().getParent(), HConstants.HFILE_ARCHIVE_DIRECTORY); - try { - fs.mkdirs(archiveDir); - } catch (IOException ioe) { - LOG.warn("Making " + archiveDir, ioe); + boolean archived = false; + if (walArchiveDir != null) { + Path archivedFile = new Path(walArchiveDir, logFile.getName()); + LOG.info("ARCHIVED (TODO: FILES ARE NOT PURGED FROM ARCHIVE!) " + logFile + " to " + walArchiveDir); + if (!fs.rename(logFile, archivedFile)) { + LOG.warn("Failed archive of " + logFile + ", deleting"); + } else { + archived = true; + } } - Path archivedFile = new Path(archiveDir, logFile.getName()); - LOG.info("ARCHIVED WAL (TODO: FILES ARE NOT PURGED FROM ARCHIVE!) " + logFile + " to " + archivedFile); - if (!fs.rename(logFile, archivedFile)) { - LOG.warn("Failed archive of " + logFile); + if (!archived) { + if (!fs.delete(logFile, false)) { + LOG.warn("Failed delete of " + logFile); + } } - // fs.delete(logFile, false); } public void setProcIds(long minId, long maxId) { http://git-wip-us.apache.org/repos/asf/hbase/blob/a8dfa377/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/store/wal/WALProcedureStore.java ---------------------------------------------------------------------- diff --git a/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/store/wal/WALProcedureStore.java b/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/store/wal/WALProcedureStore.java index 300e023..df818fe 100644 --- a/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/store/wal/WALProcedureStore.java +++ b/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/store/wal/WALProcedureStore.java @@ -124,6 +124,7 @@ public class WALProcedureStore extends ProcedureStoreBase { private final Configuration conf; private final FileSystem fs; private final Path walDir; + private final Path walArchiveDir; private final AtomicReference<Throwable> syncException = new AtomicReference<>(); private final AtomicBoolean loading = new AtomicBoolean(true); @@ -185,9 +186,15 @@ public class WALProcedureStore extends ProcedureStoreBase { public WALProcedureStore(final Configuration conf, final FileSystem fs, final Path walDir, final LeaseRecovery leaseRecovery) { + this(conf, fs, walDir, null, leaseRecovery); + } + + public WALProcedureStore(final Configuration conf, final FileSystem fs, final Path walDir, + final Path walArchiveDir, final LeaseRecovery leaseRecovery) { this.fs = fs; this.conf = conf; this.walDir = walDir; + this.walArchiveDir = walArchiveDir; this.leaseRecovery = leaseRecovery; } @@ -343,7 +350,7 @@ public class WALProcedureStore extends ProcedureStoreBase { if (LOG.isDebugEnabled()) { LOG.debug("Someone else created new logs. Expected maxLogId < " + flushLogId); } - logs.getLast().removeFile(); + logs.getLast().removeFile(this.walArchiveDir); continue; } @@ -955,7 +962,7 @@ public class WALProcedureStore extends ProcedureStoreBase { // but we should check if someone else has created new files if (getMaxLogId(getLogFiles()) > flushLogId) { LOG.warn("Someone else created new logs. Expected maxLogId < " + flushLogId); - logs.getLast().removeFile(); + logs.getLast().removeFile(this.walArchiveDir); return false; } @@ -1047,7 +1054,7 @@ public class WALProcedureStore extends ProcedureStoreBase { // We keep track of which procedures are holding the oldest WAL in 'holdingCleanupTracker'. // once there is nothing olding the oldest WAL we can remove it. while (logs.size() > 1 && holdingCleanupTracker.isEmpty()) { - removeLogFile(logs.getFirst()); + removeLogFile(logs.getFirst(), walArchiveDir); buildHoldingCleanupTracker(); } @@ -1089,7 +1096,7 @@ public class WALProcedureStore extends ProcedureStoreBase { if (lastLogId < log.getLogId()) { break; } - removeLogFile(log); + removeLogFile(log, walArchiveDir); removed = true; } @@ -1098,12 +1105,12 @@ public class WALProcedureStore extends ProcedureStoreBase { } } - private boolean removeLogFile(final ProcedureWALFile log) { + private boolean removeLogFile(final ProcedureWALFile log, final Path walArchiveDir) { try { if (LOG.isTraceEnabled()) { LOG.trace("Removing log=" + log); } - log.removeFile(); + log.removeFile(walArchiveDir); logs.remove(log); if (LOG.isDebugEnabled()) { LOG.info("Removed log=" + log + " activeLogs=" + logs); @@ -1192,7 +1199,7 @@ public class WALProcedureStore extends ProcedureStoreBase { } maxLogId = Math.max(maxLogId, getLogIdFromName(logPath.getName())); - ProcedureWALFile log = initOldLog(logFiles[i]); + ProcedureWALFile log = initOldLog(logFiles[i], this.walArchiveDir); if (log != null) { this.logs.add(log); } @@ -1222,11 +1229,12 @@ public class WALProcedureStore extends ProcedureStoreBase { /** * Loads given log file and it's tracker. */ - private ProcedureWALFile initOldLog(final FileStatus logFile) throws IOException { + private ProcedureWALFile initOldLog(final FileStatus logFile, final Path walArchiveDir) + throws IOException { final ProcedureWALFile log = new ProcedureWALFile(fs, logFile); if (logFile.getLen() == 0) { LOG.warn("Remove uninitialized log: " + logFile); - log.removeFile(); + log.removeFile(walArchiveDir); return null; } if (LOG.isDebugEnabled()) { @@ -1236,7 +1244,7 @@ public class WALProcedureStore extends ProcedureStoreBase { log.open(); } catch (ProcedureWALFormat.InvalidWALDataException e) { LOG.warn("Remove uninitialized log: " + logFile, e); - log.removeFile(); + log.removeFile(walArchiveDir); return null; } catch (IOException e) { String msg = "Unable to read state log: " + logFile; http://git-wip-us.apache.org/repos/asf/hbase/blob/a8dfa377/hbase-server/src/main/java/org/apache/hadoop/hbase/master/HMaster.java ---------------------------------------------------------------------- diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/HMaster.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/HMaster.java index 06e05c3..6796d59 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/HMaster.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/HMaster.java @@ -1163,6 +1163,8 @@ public class HMaster extends HRegionServer implements MasterServices { final MasterProcedureEnv procEnv = new MasterProcedureEnv(this); final Path walDir = new Path(FSUtils.getWALRootDir(this.conf), MasterProcedureConstants.MASTER_PROCEDURE_LOGDIR); + final Path archiveWalDir = new Path(new Path(FSUtils.getWALRootDir(this.conf), + HConstants.HFILE_ARCHIVE_DIRECTORY), MasterProcedureConstants.MASTER_PROCEDURE_LOGDIR); final FileSystem walFs = walDir.getFileSystem(conf); @@ -1176,7 +1178,7 @@ public class HMaster extends HRegionServer implements MasterServices { FSUtils.setStoragePolicy(walFs, conf, walDir, HConstants.WAL_STORAGE_POLICY, HConstants.DEFAULT_WAL_STORAGE_POLICY); - procedureStore = new WALProcedureStore(conf, walFs, walDir, + procedureStore = new WALProcedureStore(conf, walDir.getFileSystem(conf), walDir, archiveWalDir, new MasterProcedureEnv.WALStoreLeaseRecovery(this)); procedureStore.registerListener(new MasterProcedureEnv.MasterProcedureStoreListener(this)); MasterProcedureScheduler procedureScheduler = procEnv.getProcedureScheduler(); http://git-wip-us.apache.org/repos/asf/hbase/blob/a8dfa377/hbase-server/src/test/java/org/apache/hadoop/hbase/master/assignment/TestAssignmentManager.java ---------------------------------------------------------------------- diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/master/assignment/TestAssignmentManager.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/master/assignment/TestAssignmentManager.java index 9afb63f..dda41e0 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/master/assignment/TestAssignmentManager.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/master/assignment/TestAssignmentManager.java @@ -121,7 +121,7 @@ public class TestAssignmentManager { conf.setInt(WALProcedureStore.SYNC_WAIT_MSEC_CONF_KEY, 10); conf.setInt(MasterProcedureConstants.MASTER_PROCEDURE_THREADS, PROC_NTHREADS); conf.setInt(RSProcedureDispatcher.RS_RPC_STARTUP_WAIT_TIME_CONF_KEY, 1000); - conf.setInt(AssignmentManager.ASSIGN_MAX_ATTEMPTS, 5); + conf.setInt(AssignmentManager.ASSIGN_MAX_ATTEMPTS, 100); // Have many so we succeed eventually. } @Before