HBASE-20734 Colocate recovered edits directory with hbase.wal.dir Signed-off-by: Andrew Purtell <apurt...@apache.org> Signed-off-by: Reid Chan <reidc...@apache.org>
Conflicts: hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java Project: http://git-wip-us.apache.org/repos/asf/hbase/repo Commit: http://git-wip-us.apache.org/repos/asf/hbase/commit/9675ad38 Tree: http://git-wip-us.apache.org/repos/asf/hbase/tree/9675ad38 Diff: http://git-wip-us.apache.org/repos/asf/hbase/diff/9675ad38 Branch: refs/heads/branch-1.3 Commit: 9675ad387627588bdf0c37f56f701a0d0074d196 Parents: 1d362f6 Author: Zach York <zy...@apache.org> Authored: Wed Jun 27 16:18:53 2018 -0700 Committer: Andrew Purtell <apurt...@apache.org> Committed: Wed Dec 12 19:22:42 2018 -0800 ---------------------------------------------------------------------- .../hadoop/hbase/regionserver/HRegion.java | 190 ++++++++++++------- .../org/apache/hadoop/hbase/util/FSUtils.java | 27 +++ .../apache/hadoop/hbase/wal/WALSplitter.java | 166 ++++++++-------- .../hadoop/hbase/regionserver/TestHRegion.java | 8 +- .../hbase/regionserver/TestRecoveredEdits.java | 2 +- .../hbase/regionserver/wal/TestWALReplay.java | 6 +- .../apache/hadoop/hbase/wal/TestWALFactory.java | 2 +- .../apache/hadoop/hbase/wal/TestWALSplit.java | 50 ++--- 8 files changed, 263 insertions(+), 188 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/hbase/blob/9675ad38/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java ---------------------------------------------------------------------- diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java index 9d8c0c6..d41e7cb 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java @@ -41,6 +41,7 @@ import java.util.NavigableSet; import java.util.RandomAccess; import java.util.Set; import java.util.TreeMap; +import java.util.TreeSet; import java.util.concurrent.Callable; import java.util.concurrent.CompletionService; import java.util.concurrent.ConcurrentHashMap; @@ -309,6 +310,9 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi private final int rowLockWaitDuration; static final int DEFAULT_ROWLOCK_WAIT_DURATION = 30000; + private Path regionDir; + private FileSystem walFS; + // The internal wait duration to acquire a lock before read/update // from the region. It is not per row. The purpose of this wait time // is to avoid waiting a long time while the region is busy, so that @@ -840,7 +844,7 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi if (ServerRegionReplicaUtil.shouldReplayRecoveredEdits(this)) { // Recover any edits if available. maxSeqId = Math.max(maxSeqId, - replayRecoveredEditsIfAny(this.fs.getRegionDir(), maxSeqIdInStores, reporter, status)); + replayRecoveredEditsIfAny(maxSeqIdInStores, reporter, status)); // Make sure mvcc is up to max. this.mvcc.advanceTo(maxSeqId); } @@ -884,8 +888,8 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi // is opened before recovery completes. So we add a safety bumper to avoid new sequence number // overlaps used sequence numbers if (this.writestate.writesEnabled) { - nextSeqid = WALSplitter.writeRegionSequenceIdFile(this.fs.getFileSystem(), this.fs - .getRegionDir(), nextSeqid, (this.recovering ? (this.flushPerChanges + 10000000) : 1)); + nextSeqid = WALSplitter.writeRegionSequenceIdFile(getWalFileSystem(), getWALRegionDir(), + nextSeqid, (this.recovering ? (this.flushPerChanges + 10000000) : 1)); } else { nextSeqid++; } @@ -1023,11 +1027,11 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi getRegionServerServices().getServerName(), storeFiles); WALUtil.writeRegionEventMarker(wal, getTableDesc(), getRegionInfo(), regionEventDesc, mvcc); - // Store SeqId in HDFS when a region closes + // Store SeqId in WAL FileSystem when a region closes // checking region folder exists is due to many tests which delete the table folder while a // table is still online - if (this.fs.getFileSystem().exists(this.fs.getRegionDir())) { - WALSplitter.writeRegionSequenceIdFile(this.fs.getFileSystem(), this.fs.getRegionDir(), + if (getWalFileSystem().exists(getWALRegionDir())) { + WALSplitter.writeRegionSequenceIdFile(getWalFileSystem(), getWALRegionDir(), mvcc.getReadPoint(), 0); } } @@ -1702,6 +1706,32 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi return this.fs; } + /** @return the WAL {@link HRegionFileSystem} used by this region */ + HRegionFileSystem getRegionWALFileSystem() throws IOException { + return new HRegionFileSystem(conf, getWalFileSystem(), + FSUtils.getWALTableDir(conf, htableDescriptor.getTableName()), fs.getRegionInfo()); + } + + /** @return WAL {@link FileSystem} being used by this region */ + FileSystem getWalFileSystem() throws IOException { + if (walFS == null) { + walFS = FSUtils.getWALFileSystem(conf); + } + return walFS; + } + + /** + * @return the Region Directory under the WALRootDir + * @throws IOException if there is an error getting WALRootDir + */ + @VisibleForTesting + public Path getWALRegionDir() throws IOException { + if (regionDir == null) { + regionDir = FSUtils.getWALRegionDir(conf, fs.getRegionInfo()); + } + return regionDir; + } + @Override public long getEarliestFlushTimeForAllStores() { return Collections.min(lastStoreFlushTimeMap.values()); @@ -4106,8 +4136,7 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi * recovered edits log or <code>minSeqId</code> if nothing added from editlogs. * @throws IOException */ - protected long replayRecoveredEditsIfAny(final Path regiondir, - Map<byte[], Long> maxSeqIdInStores, + protected long replayRecoveredEditsIfAny(Map<byte[], Long> maxSeqIdInStores, final CancelableProgressable reporter, final MonitoredTask status) throws IOException { long minSeqIdForTheRegion = -1; @@ -4118,30 +4147,92 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi } long seqid = minSeqIdForTheRegion; - FileSystem fs = this.fs.getFileSystem(); - NavigableSet<Path> files = WALSplitter.getSplitEditFilesSorted(fs, regiondir); + FileSystem walFS = getWalFileSystem(); + Path regionDir = getWALRegionDir(); + FileSystem rootFS = getFilesystem(); + Path defaultRegionDir = getRegionDir(FSUtils.getRootDir(conf), getRegionInfo()); + + // This is to ensure backwards compatability with HBASE-20723 where recovered edits can appear + // under the root dir even if walDir is set. + NavigableSet<Path> filesUnderRootDir = null; + if (!regionDir.equals(defaultRegionDir)) { + filesUnderRootDir = + WALSplitter.getSplitEditFilesSorted(rootFS, defaultRegionDir); + seqid = Math.max(seqid, + replayRecoveredEditsForPaths(minSeqIdForTheRegion, rootFS, filesUnderRootDir, reporter, + defaultRegionDir)); + } + NavigableSet<Path> files = WALSplitter.getSplitEditFilesSorted(walFS, regionDir); + seqid = Math.max(seqid, replayRecoveredEditsForPaths(minSeqIdForTheRegion, walFS, + files, reporter, regionDir)); + + if (seqid > minSeqIdForTheRegion) { + // Then we added some edits to memory. Flush and cleanup split edit files. + internalFlushcache(null, seqid, stores.values(), status, false); + } + // Now delete the content of recovered edits. We're done w/ them. + if (files.size() > 0 && this.conf.getBoolean("hbase.region.archive.recovered.edits", false)) { + // For debugging data loss issues! + // If this flag is set, make use of the hfile archiving by making recovered.edits a fake + // column family. Have to fake out file type too by casting our recovered.edits as storefiles + String fakeFamilyName = WALSplitter.getRegionDirRecoveredEditsDir(regionDir).getName(); + Set<StoreFile> fakeStoreFiles = new HashSet<>(files.size()); + for (Path file: files) { + fakeStoreFiles.add( + new StoreFile(walFS, file, this.conf, null, null)); + } + getRegionWALFileSystem().removeStoreFiles(fakeFamilyName, fakeStoreFiles); + } else { + if (filesUnderRootDir != null) { + for (Path file : filesUnderRootDir) { + if (!rootFS.delete(file, false)) { + LOG.error("Failed delete of {} under root directory." + file); + } else { + LOG.debug("Deleted recovered.edits root directory file=" + file); + } + } + } + for (Path file: files) { + if (!walFS.delete(file, false)) { + LOG.error("Failed delete of " + file); + } else { + LOG.debug("Deleted recovered.edits file=" + file); + } + } + } + return seqid; + } + + private long replayRecoveredEditsForPaths(long minSeqIdForTheRegion, FileSystem fs, + final NavigableSet<Path> files, final CancelableProgressable reporter, final Path regionDir) + throws IOException { + long seqid = minSeqIdForTheRegion; if (LOG.isDebugEnabled()) { - LOG.debug("Found " + (files == null ? 0 : files.size()) - + " recovered edits file(s) under " + regiondir); + LOG.debug("Found " + (files == null ? 0 : files.size()) + + " recovered edits file(s) under " + regionDir); } - if (files == null || files.isEmpty()) return seqid; + if (files == null || files.isEmpty()) { + return seqid; + } - for (Path edits: files) { - if (edits == null || !fs.exists(edits)) { + for (Path edits : files) { + if (edits == null || !walFS.exists(edits)) { LOG.warn("Null or non-existent edits file: " + edits); continue; } - if (isZeroLengthThenDelete(fs, edits)) continue; + if (isZeroLengthThenDelete(walFS, edits)) { + continue; + } long maxSeqId; String fileName = edits.getName(); maxSeqId = Math.abs(Long.parseLong(fileName)); if (maxSeqId <= minSeqIdForTheRegion) { if (LOG.isDebugEnabled()) { - String msg = "Maximum sequenceid for this wal is " + maxSeqId - + " and minimum sequenceid for the region is " + minSeqIdForTheRegion - + ", skipped the whole file, path=" + edits; + String msg = "Maximum sequenceid for this wal is " + maxSeqId + + " and minimum sequenceid for the region is " + minSeqIdForTheRegion + + ", skipped the whole file, path=" + edits; LOG.debug(msg); } continue; @@ -4150,77 +4241,42 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi try { // replay the edits. Replay can return -1 if everything is skipped, only update // if seqId is greater - seqid = Math.max(seqid, replayRecoveredEdits(edits, maxSeqIdInStores, reporter)); + seqid = Math.max(seqid, replayRecoveredEdits(edits, maxSeqIdInStores, reporter, fs)); } catch (IOException e) { - boolean skipErrors = conf.getBoolean( - HConstants.HREGION_EDITS_REPLAY_SKIP_ERRORS, - conf.getBoolean( - "hbase.skip.errors", + boolean skipErrors = conf.getBoolean(HConstants.HREGION_EDITS_REPLAY_SKIP_ERRORS, + conf.getBoolean("hbase.skip.errors", HConstants.DEFAULT_HREGION_EDITS_REPLAY_SKIP_ERRORS)); if (conf.get("hbase.skip.errors") != null) { - LOG.warn( - "The property 'hbase.skip.errors' has been deprecated. Please use " + + LOG.warn("The property 'hbase.skip.errors' has been deprecated. Please use " + HConstants.HREGION_EDITS_REPLAY_SKIP_ERRORS + " instead."); } if (skipErrors) { - Path p = WALSplitter.moveAsideBadEditsFile(fs, edits); - LOG.error(HConstants.HREGION_EDITS_REPLAY_SKIP_ERRORS - + "=true so continuing. Renamed " + edits + - " as " + p, e); + Path p = WALSplitter.moveAsideBadEditsFile(walFS, edits); + LOG.error(HConstants.HREGION_EDITS_REPLAY_SKIP_ERRORS + + "=true so continuing. Renamed " + edits + " as " + p, e); } else { throw e; } } } - // The edits size added into rsAccounting during this replaying will not - // be required any more. So just clear it. - if (this.rsAccounting != null) { - this.rsAccounting.clearRegionReplayEditsSize(getRegionInfo().getRegionName()); - } - if (seqid > minSeqIdForTheRegion) { - // Then we added some edits to memory. Flush and cleanup split edit files. - internalFlushcache(null, seqid, stores.values(), status, false); - } - // Now delete the content of recovered edits. We're done w/ them. - if (files.size() > 0 && this.conf.getBoolean("hbase.region.archive.recovered.edits", false)) { - // For debugging data loss issues! - // If this flag is set, make use of the hfile archiving by making recovered.edits a fake - // column family. Have to fake out file type too by casting our recovered.edits as storefiles - String fakeFamilyName = WALSplitter.getRegionDirRecoveredEditsDir(regiondir).getName(); - Set<StoreFile> fakeStoreFiles = new HashSet<StoreFile>(files.size()); - for (Path file: files) { - fakeStoreFiles.add(new StoreFile(getRegionFileSystem().getFileSystem(), file, this.conf, - null, null)); - } - getRegionFileSystem().removeStoreFiles(fakeFamilyName, fakeStoreFiles); - } else { - for (Path file: files) { - if (!fs.delete(file, false)) { - LOG.error("Failed delete of " + file); - } else { - LOG.debug("Deleted recovered.edits file=" + file); - } - } - } return seqid; } - /* + /** * @param edits File of recovered edits. * @param maxSeqIdInStores Maximum sequenceid found in each store. Edits in wal - * must be larger than this to be replayed for each store. - * @param reporter + * must be larger than this to be replayed for each store. + * @param reporter CacelableProgressable reporter * @return the sequence id of the last edit added to this region out of the * recovered edits log or <code>minSeqId</code> if nothing added from editlogs. * @throws IOException */ - private long replayRecoveredEdits(final Path edits, - Map<byte[], Long> maxSeqIdInStores, final CancelableProgressable reporter) + private long replayRecoveredEdits(final Path edits, Map<byte[], Long> maxSeqIdInStores, + final CancelableProgressable reporter, final FileSystem fs) throws IOException { String msg = "Replaying edits from " + edits; LOG.info(msg); MonitoredTask status = TaskMonitor.get().createStatus(msg); - FileSystem fs = this.fs.getFileSystem(); status.setStatus("Opening recovered edits"); WAL.Reader reader = null; @@ -8174,7 +8230,7 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi public static final long FIXED_OVERHEAD = ClassSize.align( ClassSize.OBJECT + ClassSize.ARRAY + - 48 * ClassSize.REFERENCE + 3 * Bytes.SIZEOF_INT + + 50 * ClassSize.REFERENCE + 3 * Bytes.SIZEOF_INT + (14 * Bytes.SIZEOF_LONG) + 5 * Bytes.SIZEOF_BOOLEAN); http://git-wip-us.apache.org/repos/asf/hbase/blob/9675ad38/hbase-server/src/main/java/org/apache/hadoop/hbase/util/FSUtils.java ---------------------------------------------------------------------- diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/util/FSUtils.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/util/FSUtils.java index 5784b74..4afb1e2 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/util/FSUtils.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/util/FSUtils.java @@ -1060,6 +1060,20 @@ public abstract class FSUtils { } /** + * Returns the WAL region directory based on the region info + * @param conf configuration to determine WALRootDir + * @param regionInfo used to get region and table + * @return the region directory used to store WALs under the WALRootDir + * @throws IOException if there is an exception determining the WALRootDir + */ + public static Path getWALRegionDir(final Configuration conf, + final HRegionInfo regionInfo) + throws IOException { + return new Path(getWALTableDir(conf, regionInfo.getTable()), + regionInfo.getEncodedName()); + } + + /** * Checks if meta region exists * * @param fs file system @@ -1195,6 +1209,19 @@ public abstract class FSUtils { } /** + * Returns the Table directory under the WALRootDir for the specified table name + * @param conf configuration used to get the WALRootDir + * @param tableName Table to get the directory for + * @return a path to the WAL table directory for the specified table + * @throws IOException if there is an exception determining the WALRootDir + */ + public static Path getWALTableDir(final Configuration conf, final TableName tableName) + throws IOException { + return new Path(new Path(getWALRootDir(conf), tableName.getNamespaceAsString()), + tableName.getQualifierAsString()); + } + + /** * Returns the {@link org.apache.hadoop.hbase.TableName} object representing * the table directory under * path rootdir http://git-wip-us.apache.org/repos/asf/hbase/blob/9675ad38/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/WALSplitter.java ---------------------------------------------------------------------- diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/WALSplitter.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/WALSplitter.java index 45dbb11..47a5596 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/WALSplitter.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/WALSplitter.java @@ -141,7 +141,7 @@ public class WALSplitter { // Parameters for split process protected final Path walDir; - protected final FileSystem fs; + protected final FileSystem walFS; protected final Configuration conf; // Major subcomponents of the split process. @@ -188,14 +188,14 @@ public class WALSplitter { @VisibleForTesting WALSplitter(final WALFactory factory, Configuration conf, Path walDir, - FileSystem fs, LastSequenceId idChecker, + FileSystem walFS, LastSequenceId idChecker, CoordinatedStateManager csm, RecoveryMode mode) { this.conf = HBaseConfiguration.create(conf); String codecClassName = conf .get(WALCellCodec.WAL_CELL_CODEC_CLASS_KEY, WALCellCodec.class.getName()); this.conf.set(HConstants.RPC_CODEC_CONF_KEY, codecClassName); this.walDir = walDir; - this.fs = fs; + this.walFS = walFS; this.sequenceIdChecker = idChecker; this.csm = (BaseCoordinatedStateManager)csm; this.walFactory = factory; @@ -236,7 +236,7 @@ public class WALSplitter { * <p> * @param rootDir * @param logfile - * @param fs + * @param walFS FileSystem to use for WAL reading and splitting * @param conf * @param reporter * @param idChecker @@ -244,10 +244,10 @@ public class WALSplitter { * @return false if it is interrupted by the progress-able. * @throws IOException */ - public static boolean splitLogFile(Path walDir, FileStatus logfile, FileSystem fs, + public static boolean splitLogFile(Path walDir, FileStatus logfile, FileSystem walFS, Configuration conf, CancelableProgressable reporter, LastSequenceId idChecker, CoordinatedStateManager cp, RecoveryMode mode, final WALFactory factory) throws IOException { - WALSplitter s = new WALSplitter(factory, conf, walDir, fs, idChecker, cp, mode); + WALSplitter s = new WALSplitter(factory, conf, walDir, walFS, idChecker, cp, mode); return s.splitLogFile(logfile, reporter); } @@ -315,7 +315,7 @@ public class WALSplitter { in = getReader(logfile, skipErrors, reporter); } catch (CorruptedLogFileException e) { LOG.warn("Could not get reader, corrupted log file " + logPath, e); - ZKSplitLog.markCorrupted(walDir, logfile.getPath().getName(), fs); + ZKSplitLog.markCorrupted(walDir, logfile.getPath().getName(), walFS); isCorrupted = true; } if (in == null) { @@ -407,7 +407,7 @@ public class WALSplitter { } catch (CorruptedLogFileException e) { LOG.warn("Could not parse, corrupted log file " + logPath, e); csm.getSplitLogWorkerCoordination().markCorrupted(walDir, - logfile.getPath().getName(), fs); + logfile.getPath().getName(), walFS); isCorrupted = true; } catch (IOException e) { e = RemoteExceptionHandler.checkIOException(e); @@ -455,31 +455,30 @@ public class WALSplitter { */ public static void finishSplitLogFile(String logfile, Configuration conf) throws IOException { - Path rootdir = FSUtils.getWALRootDir(conf); - Path oldLogDir = new Path(rootdir, HConstants.HREGION_OLDLOGDIR_NAME); + Path walDir = FSUtils.getWALRootDir(conf); + Path oldLogDir = new Path(walDir, HConstants.HREGION_OLDLOGDIR_NAME); Path logPath; - if (FSUtils.isStartingWithPath(rootdir, logfile)) { + if (FSUtils.isStartingWithPath(walDir, logfile)) { logPath = new Path(logfile); } else { - logPath = new Path(rootdir, logfile); + logPath = new Path(walDir, logfile); } - finishSplitLogFile(rootdir, oldLogDir, logPath, conf); + finishSplitLogFile(walDir, oldLogDir, logPath, conf); } - private static void finishSplitLogFile(Path rootdir, Path oldLogDir, + private static void finishSplitLogFile(Path walDir, Path oldLogDir, Path logPath, Configuration conf) throws IOException { List<Path> processedLogs = new ArrayList<Path>(); List<Path> corruptedLogs = new ArrayList<Path>(); - FileSystem fs; - fs = rootdir.getFileSystem(conf); - if (ZKSplitLog.isCorrupted(rootdir, logPath.getName(), fs)) { + FileSystem walFS = walDir.getFileSystem(conf); + if (ZKSplitLog.isCorrupted(walDir, logPath.getName(), walFS)) { corruptedLogs.add(logPath); } else { processedLogs.add(logPath); } - archiveLogs(corruptedLogs, processedLogs, oldLogDir, fs, conf); - Path stagingDir = ZKSplitLog.getSplitLogDir(rootdir, logPath.getName()); - fs.delete(stagingDir, true); + archiveLogs(corruptedLogs, processedLogs, oldLogDir, walFS, conf); + Path stagingDir = ZKSplitLog.getSplitLogDir(walDir, logPath.getName()); + walFS.delete(stagingDir, true); } /** @@ -490,28 +489,28 @@ public class WALSplitter { * @param corruptedLogs * @param processedLogs * @param oldLogDir - * @param fs + * @param walFS FileSystem to use for WAL archival * @param conf * @throws IOException */ private static void archiveLogs( final List<Path> corruptedLogs, final List<Path> processedLogs, final Path oldLogDir, - final FileSystem fs, final Configuration conf) throws IOException { + final FileSystem walFS, final Configuration conf) throws IOException { final Path corruptDir = new Path(FSUtils.getWALRootDir(conf), conf.get( "hbase.regionserver.hlog.splitlog.corrupt.dir", HConstants.CORRUPT_DIR_NAME)); - if (!fs.mkdirs(corruptDir)) { + if (!walFS.mkdirs(corruptDir)) { LOG.info("Unable to mkdir " + corruptDir); } - fs.mkdirs(oldLogDir); + walFS.mkdirs(oldLogDir); // this method can get restarted or called multiple times for archiving // the same log files. for (Path corrupted : corruptedLogs) { Path p = new Path(corruptDir, corrupted.getName()); - if (fs.exists(corrupted)) { - if (!fs.rename(corrupted, p)) { + if (walFS.exists(corrupted)) { + if (!walFS.rename(corrupted, p)) { LOG.warn("Unable to move corrupted log " + corrupted + " to " + p); } else { LOG.warn("Moved corrupted log " + corrupted + " to " + p); @@ -521,8 +520,8 @@ public class WALSplitter { for (Path p : processedLogs) { Path newPath = FSHLog.getWALArchivePath(oldLogDir, p); - if (fs.exists(p)) { - if (!FSUtils.renameAndSetModifyTime(fs, p, newPath)) { + if (walFS.exists(p)) { + if (!FSUtils.renameAndSetModifyTime(walFS, p, newPath)) { LOG.warn("Unable to move " + p + " to " + newPath); } else { LOG.info("Archived processed log " + p + " to " + newPath); @@ -548,35 +547,28 @@ public class WALSplitter { @VisibleForTesting static Path getRegionSplitEditsPath(final Entry logEntry, String fileNameBeingSplit, String tmpDirName, Configuration conf) throws IOException { - FileSystem fs = FileSystem.get(conf); - Path rootDir = FSUtils.getRootDir(conf); - Path tableDir = FSUtils.getTableDir(rootDir, logEntry.getKey().getTablename()); + FileSystem walFS = FSUtils.getWALFileSystem(conf); + Path tableDir = FSUtils.getWALTableDir(conf, logEntry.getKey().getTablename()); String encodedRegionName = Bytes.toString(logEntry.getKey().getEncodedRegionName()); - Path regiondir = HRegion.getRegionDir(tableDir, encodedRegionName); - Path dir = getRegionDirRecoveredEditsDir(regiondir); + Path regionDir = HRegion.getRegionDir(tableDir, encodedRegionName); + Path dir = getRegionDirRecoveredEditsDir(regionDir); - if (!fs.exists(regiondir)) { - LOG.info("This region's directory doesn't exist: " - + regiondir.toString() + ". It is very likely that it was" + - " already split so it's safe to discard those edits."); - return null; - } - if (fs.exists(dir) && fs.isFile(dir)) { + if (walFS.exists(dir) && walFS.isFile(dir)) { Path tmp = new Path(tmpDirName); - if (!fs.exists(tmp)) { - fs.mkdirs(tmp); + if (!walFS.exists(tmp)) { + walFS.mkdirs(tmp); } tmp = new Path(tmp, HConstants.RECOVERED_EDITS_DIR + "_" + encodedRegionName); LOG.warn("Found existing old file: " + dir + ". It could be some " + "leftover of an old installation. It should be a folder instead. " + "So moving it to " + tmp); - if (!fs.rename(dir, tmp)) { + if (!walFS.rename(dir, tmp)) { LOG.warn("Failed to sideline old file " + dir); } } - if (!fs.exists(dir) && !fs.mkdirs(dir)) { + if (!walFS.exists(dir) && !walFS.mkdirs(dir)) { LOG.warn("mkdir failed on " + dir); } // Append fileBeingSplit to prevent name conflict since we may have duplicate wal entries now. @@ -614,31 +606,32 @@ public class WALSplitter { private static final String RECOVERED_LOG_TMPFILE_SUFFIX = ".temp"; /** - * @param regiondir + * @param regionDir * This regions directory in the filesystem. * @return The directory that holds recovered edits files for the region - * <code>regiondir</code> + * <code>regionDir</code> */ - public static Path getRegionDirRecoveredEditsDir(final Path regiondir) { - return new Path(regiondir, HConstants.RECOVERED_EDITS_DIR); + public static Path getRegionDirRecoveredEditsDir(final Path regionDir) { + return new Path(regionDir, HConstants.RECOVERED_EDITS_DIR); } /** * Returns sorted set of edit files made by splitter, excluding files * with '.temp' suffix. * - * @param fs - * @param regiondir - * @return Files in passed <code>regiondir</code> as a sorted set. + * @param walFS FileSystem to use for reading Recovered edits files + * @param regionDir Directory where Recovered edits should reside + * @return Files in passed <code>regionDir</code> as a sorted set. * @throws IOException */ - public static NavigableSet<Path> getSplitEditFilesSorted(final FileSystem fs, - final Path regiondir) throws IOException { + public static NavigableSet<Path> getSplitEditFilesSorted(final FileSystem walFS, + final Path regionDir) throws IOException { NavigableSet<Path> filesSorted = new TreeSet<Path>(); - Path editsdir = getRegionDirRecoveredEditsDir(regiondir); - if (!fs.exists(editsdir)) + Path editsdir = getRegionDirRecoveredEditsDir(regionDir); + if (!walFS.exists(editsdir)) { return filesSorted; - FileStatus[] files = FSUtils.listStatus(fs, editsdir, new PathFilter() { + } + FileStatus[] files = FSUtils.listStatus(walFS, editsdir, new PathFilter() { @Override public boolean accept(Path p) { boolean result = false; @@ -648,7 +641,7 @@ public class WALSplitter { // In particular, on error, we'll move aside the bad edit file giving // it a timestamp suffix. See moveAsideBadEditsFile. Matcher m = EDITFILES_NAME_PATTERN.matcher(p.getName()); - result = fs.isFile(p) && m.matches(); + result = walFS.isFile(p) && m.matches(); // Skip the file whose name ends with RECOVERED_LOG_TMPFILE_SUFFIX, // because it means splitwal thread is writting this file. if (p.getName().endsWith(RECOVERED_LOG_TMPFILE_SUFFIX)) { @@ -676,17 +669,17 @@ public class WALSplitter { /** * Move aside a bad edits file. * - * @param fs + * @param walFS FileSystem to use for WAL operations * @param edits * Edits file to move aside. * @return The name of the moved aside file. * @throws IOException */ - public static Path moveAsideBadEditsFile(final FileSystem fs, final Path edits) + public static Path moveAsideBadEditsFile(final FileSystem walFS, final Path edits) throws IOException { Path moveAsideName = new Path(edits.getParent(), edits.getName() + "." + System.currentTimeMillis()); - if (!fs.rename(edits, moveAsideName)) { + if (!walFS.rename(edits, moveAsideName)) { LOG.warn("Rename failed from " + edits + " to " + moveAsideName); } return moveAsideName; @@ -707,21 +700,21 @@ public class WALSplitter { /** * Create a file with name as region open sequence id - * @param fs - * @param regiondir + * @param walFS FileSystem to write Sequence file to + * @param regionDir WALRegionDir used to determine where to write edits files * @param newSeqId * @param saftyBumper * @return long new sequence Id value * @throws IOException */ - public static long writeRegionSequenceIdFile(final FileSystem fs, final Path regiondir, + public static long writeRegionSequenceIdFile(final FileSystem walFS, final Path regionDir, long newSeqId, long saftyBumper) throws IOException { - Path editsdir = WALSplitter.getRegionDirRecoveredEditsDir(regiondir); + Path editsdir = WALSplitter.getRegionDirRecoveredEditsDir(regionDir); long maxSeqId = 0; FileStatus[] files = null; - if (fs.exists(editsdir)) { - files = FSUtils.listStatus(fs, editsdir, new PathFilter() { + if (walFS.exists(editsdir)) { + files = FSUtils.listStatus(walFS, editsdir, new PathFilter() { @Override public boolean accept(Path p) { return isSequenceIdFile(p); @@ -749,7 +742,7 @@ public class WALSplitter { Path newSeqIdFile = new Path(editsdir, newSeqId + SEQUENCE_ID_FILE_SUFFIX); if (newSeqId != maxSeqId) { try { - if (!fs.createNewFile(newSeqIdFile) && !fs.exists(newSeqIdFile)) { + if (!walFS.createNewFile(newSeqIdFile) && !walFS.exists(newSeqIdFile)) { throw new IOException("Failed to create SeqId file:" + newSeqIdFile); } if (LOG.isDebugEnabled()) { @@ -766,7 +759,7 @@ public class WALSplitter { if (newSeqIdFile.equals(status.getPath())) { continue; } - fs.delete(status.getPath(), false); + walFS.delete(status.getPath(), false); } } return newSeqId; @@ -794,7 +787,7 @@ public class WALSplitter { } try { - FSUtils.getInstance(fs, conf).recoverFileLease(fs, path, conf, reporter); + FSUtils.getInstance(walFS, conf).recoverFileLease(walFS, path, conf, reporter); try { in = getReader(path, reporter); } catch (EOFException e) { @@ -864,7 +857,7 @@ public class WALSplitter { */ protected Writer createWriter(Path logfile) throws IOException { - return walFactory.createRecoveredEditsWriter(fs, logfile); + return walFactory.createRecoveredEditsWriter(walFS, logfile); } /** @@ -872,7 +865,7 @@ public class WALSplitter { * @return new Reader instance, caller should close */ protected Reader getReader(Path curLogFile, CancelableProgressable reporter) throws IOException { - return walFactory.createReader(fs, curLogFile, reporter); + return walFactory.createReader(walFS, curLogFile, reporter); } /** @@ -1355,10 +1348,10 @@ public class WALSplitter { } // delete the one with fewer wal entries - void deleteOneWithFewerEntries(FileSystem rootFs, WriterAndPath wap, Path dst) + void deleteOneWithFewerEntries(WriterAndPath wap, Path dst) throws IOException { long dstMinLogSeqNum = -1L; - try (WAL.Reader reader = walFactory.createReader(fs, dst)) { + try (WAL.Reader reader = walFactory.createReader(walFS, dst)) { WAL.Entry entry = reader.next(); if (entry != null) { dstMinLogSeqNum = entry.getKey().getLogSeqNum(); @@ -1373,15 +1366,15 @@ public class WALSplitter { if (wap.minLogSeqNum < dstMinLogSeqNum) { LOG.warn("Found existing old edits file. It could be the result of a previous failed" + " split attempt or we have duplicated wal entries. Deleting " + dst + ", length=" - + fs.getFileStatus(dst).getLen()); - if (!fs.delete(dst, false)) { + + walFS.getFileStatus(dst).getLen()); + if (!walFS.delete(dst, false)) { LOG.warn("Failed deleting of old " + dst); throw new IOException("Failed deleting of old " + dst); } } else { LOG.warn("Found existing old edits file and we have less entries. Deleting " + wap.p - + ", length=" + rootFs.getFileStatus(wap.p).getLen()); - if (!rootFs.delete(wap.p, false)) { + + ", length=" + walFS.getFileStatus(wap.p).getLen()); + if (!walFS.delete(wap.p, false)) { LOG.warn("Failed deleting of " + wap.p); throw new IOException("Failed deleting of " + wap.p); } @@ -1465,7 +1458,7 @@ public class WALSplitter { if (LOG.isTraceEnabled()) { LOG.trace("Closing " + wap.p); } - FileSystem rootFs = FileSystem.get(conf); + try { wap.w.close(); } catch (IOException ioe) { @@ -1480,7 +1473,7 @@ public class WALSplitter { } if (wap.editsWritten == 0) { // just remove the empty recovered.edits file - if (rootFs.exists(wap.p) && !rootFs.delete(wap.p, false)) { + if (walFS.exists(wap.p) && !walFS.delete(wap.p, false)) { LOG.warn("Failed deleting empty " + wap.p); throw new IOException("Failed deleting empty " + wap.p); } @@ -1490,14 +1483,14 @@ public class WALSplitter { Path dst = getCompletedRecoveredEditsFilePath(wap.p, regionMaximumEditLogSeqNum.get(encodedRegionName)); try { - if (!dst.equals(wap.p) && rootFs.exists(dst)) { - deleteOneWithFewerEntries(rootFs, wap, dst); + if (!dst.equals(wap.p) && walFS.exists(dst)) { + deleteOneWithFewerEntries(wap, dst); } // Skip the unit tests which create a splitter that reads and // writes the data without touching disk. // TestHLogSplit#testThreading is an example. - if (rootFs.exists(wap.p)) { - if (!rootFs.rename(wap.p, dst)) { + if (walFS.exists(wap.p)) { + if (!walFS.rename(wap.p, dst)) { throw new IOException("Failed renaming " + wap.p + " to " + dst); } LOG.info("Rename " + wap.p + " to " + dst); @@ -1594,12 +1587,11 @@ public class WALSplitter { if (regionedits == null) { return null; } - FileSystem rootFs = FileSystem.get(conf); - if (rootFs.exists(regionedits)) { + if (walFS.exists(regionedits)) { LOG.warn("Found old edits file. It could be the " + "result of a previous failed split attempt. Deleting " + regionedits + ", length=" - + rootFs.getFileStatus(regionedits).getLen()); - if (!rootFs.delete(regionedits, false)) { + + walFS.getFileStatus(regionedits).getLen()); + if (!walFS.delete(regionedits, false)) { LOG.warn("Failed delete of old " + regionedits); } } http://git-wip-us.apache.org/repos/asf/hbase/blob/9675ad38/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestHRegion.java ---------------------------------------------------------------------- diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestHRegion.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestHRegion.java index 7ee009c..5644c3f 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestHRegion.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestHRegion.java @@ -692,7 +692,7 @@ public class TestHRegion { for (Store store : region.getStores()) { maxSeqIdInStores.put(store.getColumnFamilyName().getBytes(), minSeqId - 1); } - long seqId = region.replayRecoveredEditsIfAny(regiondir, maxSeqIdInStores, null, status); + long seqId = region.replayRecoveredEditsIfAny(maxSeqIdInStores, null, status); assertEquals(maxSeqId, seqId); region.getMVCC().advanceTo(seqId); Get get = new Get(row); @@ -746,7 +746,7 @@ public class TestHRegion { for (Store store : region.getStores()) { maxSeqIdInStores.put(store.getColumnFamilyName().getBytes(), recoverSeqId - 1); } - long seqId = region.replayRecoveredEditsIfAny(regiondir, maxSeqIdInStores, null, status); + long seqId = region.replayRecoveredEditsIfAny(maxSeqIdInStores, null, status); assertEquals(maxSeqId, seqId); region.getMVCC().advanceTo(seqId); Get get = new Get(row); @@ -790,7 +790,7 @@ public class TestHRegion { for (Store store : region.getStores()) { maxSeqIdInStores.put(store.getColumnFamilyName().getBytes(), minSeqId); } - long seqId = region.replayRecoveredEditsIfAny(regiondir, maxSeqIdInStores, null, null); + long seqId = region.replayRecoveredEditsIfAny(maxSeqIdInStores, null, null); assertEquals(minSeqId, seqId); } @@ -846,7 +846,7 @@ public class TestHRegion { for (Store store : region.getStores()) { maxSeqIdInStores.put(store.getColumnFamilyName().getBytes(), recoverSeqId - 1); } - long seqId = region.replayRecoveredEditsIfAny(regiondir, maxSeqIdInStores, null, status); + long seqId = region.replayRecoveredEditsIfAny(maxSeqIdInStores, null, status); assertEquals(maxSeqId, seqId); // assert that the files are flushed http://git-wip-us.apache.org/repos/asf/hbase/blob/9675ad38/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestRecoveredEdits.java ---------------------------------------------------------------------- diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestRecoveredEdits.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestRecoveredEdits.java index bf72301..c3f21f3 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestRecoveredEdits.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestRecoveredEdits.java @@ -112,7 +112,7 @@ public class TestRecoveredEdits { // There should be no store files. assertTrue(storeFiles.isEmpty()); region.close(); - Path regionDir = region.getRegionDir(hbaseRootDir, hri); + Path regionDir = region.getWALRegionDir(); Path recoveredEditsDir = WALSplitter.getRegionDirRecoveredEditsDir(regionDir); // This is a little fragile getting this path to a file of 10M of edits. Path recoveredEditsFile = new Path( http://git-wip-us.apache.org/repos/asf/hbase/blob/9675ad38/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestWALReplay.java ---------------------------------------------------------------------- diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestWALReplay.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestWALReplay.java index a196f57..5240f2a 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestWALReplay.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestWALReplay.java @@ -307,7 +307,7 @@ public class TestWALReplay { MultiVersionConcurrencyControl mvcc = new MultiVersionConcurrencyControl(); HRegionInfo hri = createBasic3FamilyHRegionInfo(tableName); - Path basedir = FSUtils.getTableDir(hbaseRootDir, tableName); + Path basedir = FSUtils.getWALTableDir(conf, tableName); deleteDir(basedir); HTableDescriptor htd = createBasic3FamilyHTD(tableName); @@ -934,7 +934,7 @@ public class TestWALReplay { WALSplitter.splitLogFile(hbaseWALRootDir, listStatus[0], this.fs, this.conf, null, null, null, mode, wals); FileStatus[] listStatus1 = this.fs.listStatus( - new Path(FSUtils.getTableDir(hbaseRootDir, tableName), new Path(hri.getEncodedName(), + new Path(FSUtils.getWALTableDir(this.conf, tableName), new Path(hri.getEncodedName(), "recovered.edits")), new PathFilter() { @Override public boolean accept(Path p) { @@ -968,7 +968,7 @@ public class TestWALReplay { final int countPerFamily = 10; final HTableDescriptor htd = createBasic1FamilyHTD(tableName); HRegion region1 = HBaseTestingUtility.createRegionAndWAL(hri, hbaseRootDir, this.hbaseWALRootDir, this.conf, htd); - Path regionDir = region1.getRegionFileSystem().getRegionDir(); + Path regionDir = region1.getWALRegionDir(); HBaseTestingUtility.closeRegionAndWAL(region1); WAL wal = createWAL(this.conf); http://git-wip-us.apache.org/repos/asf/hbase/blob/9675ad38/hbase-server/src/test/java/org/apache/hadoop/hbase/wal/TestWALFactory.java ---------------------------------------------------------------------- diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/wal/TestWALFactory.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/wal/TestWALFactory.java index 3c0bade..fb2db2b 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/wal/TestWALFactory.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/wal/TestWALFactory.java @@ -172,7 +172,7 @@ public class TestWALFactory { Path oldLogDir = new Path(hbaseWALDir, HConstants.HREGION_OLDLOGDIR_NAME); final int howmany = 3; HRegionInfo[] infos = new HRegionInfo[3]; - Path tabledir = FSUtils.getTableDir(hbaseDir, tableName); + Path tabledir = FSUtils.getWALTableDir(conf, tableName); fs.mkdirs(tabledir); for(int i = 0; i < howmany; i++) { infos[i] = new HRegionInfo(tableName, http://git-wip-us.apache.org/repos/asf/hbase/blob/9675ad38/hbase-server/src/test/java/org/apache/hadoop/hbase/wal/TestWALSplit.java ---------------------------------------------------------------------- diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/wal/TestWALSplit.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/wal/TestWALSplit.java index 9b9c23d..49f8534 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/wal/TestWALSplit.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/wal/TestWALSplit.java @@ -247,9 +247,9 @@ public class TestWALSplit { } LOG.debug(ls); LOG.info("Splitting WALs out from under zombie. Expecting " + numWriters + " files."); - WALSplitter.split(HBASEDIR, WALDIR, OLDLOGDIR, fs, conf2, wals); + WALSplitter.split(HBASELOGDIR, WALDIR, OLDLOGDIR, fs, conf2, wals); LOG.info("Finished splitting out from under zombie."); - Path[] logfiles = getLogForRegion(HBASEDIR, TABLE_NAME, region); + Path[] logfiles = getLogForRegion(TABLE_NAME, region); assertEquals("wrong number of split files for region", numWriters, logfiles.length); int count = 0; for (Path logfile: logfiles) { @@ -435,9 +435,9 @@ public class TestWALSplit { generateWALs(1, 10, -1, 0); useDifferentDFSClient(); - WALSplitter.split(HBASEDIR, WALDIR, OLDLOGDIR, fs, conf, wals); + WALSplitter.split(HBASELOGDIR, WALDIR, OLDLOGDIR, fs, conf, wals); Path originalLog = (fs.listStatus(OLDLOGDIR))[0].getPath(); - Path[] splitLog = getLogForRegion(HBASEDIR, TABLE_NAME, REGION); + Path[] splitLog = getLogForRegion(TABLE_NAME, REGION); assertEquals(1, splitLog.length); assertTrue("edits differ after split", logsAreEqual(originalLog, splitLog[0])); @@ -451,9 +451,9 @@ public class TestWALSplit { generateWALs(1, 10, -1, 100); useDifferentDFSClient(); - WALSplitter.split(HBASEDIR, WALDIR, OLDLOGDIR, fs, conf, wals); + WALSplitter.split(HBASELOGDIR, WALDIR, OLDLOGDIR, fs, conf, wals); Path originalLog = (fs.listStatus(OLDLOGDIR))[0].getPath(); - Path[] splitLog = getLogForRegion(HBASEDIR, TABLE_NAME, REGION); + Path[] splitLog = getLogForRegion(TABLE_NAME, REGION); assertEquals(1, splitLog.length); assertFalse("edits differ after split", logsAreEqual(originalLog, splitLog[0])); @@ -478,13 +478,13 @@ public class TestWALSplit { writer.close(); useDifferentDFSClient(); - WALSplitter.split(HBASEDIR, WALDIR, OLDLOGDIR, fs, conf, wals); + WALSplitter.split(HBASELOGDIR, WALDIR, OLDLOGDIR, fs, conf, wals); Path originalLog = (fs.listStatus(OLDLOGDIR))[0].getPath(); // original log should have 10 test edits, 10 region markers, 1 compaction marker assertEquals(21, countWAL(originalLog)); - Path[] splitLog = getLogForRegion(HBASEDIR, TABLE_NAME, hri.getEncodedName()); + Path[] splitLog = getLogForRegion(TABLE_NAME, hri.getEncodedName()); assertEquals(1, splitLog.length); assertFalse("edits differ after split", logsAreEqual(originalLog, splitLog[0])); @@ -499,10 +499,10 @@ public class TestWALSplit { private int splitAndCount(final int expectedFiles, final int expectedEntries) throws IOException { useDifferentDFSClient(); - WALSplitter.split(HBASEDIR, WALDIR, OLDLOGDIR, fs, conf, wals); + WALSplitter.split(HBASELOGDIR, WALDIR, OLDLOGDIR, fs, conf, wals); int result = 0; for (String region : REGIONS) { - Path[] logfiles = getLogForRegion(HBASEDIR, TABLE_NAME, region); + Path[] logfiles = getLogForRegion(TABLE_NAME, region); assertEquals(expectedFiles, logfiles.length); int count = 0; for (Path logfile: logfiles) { @@ -633,7 +633,7 @@ public class TestWALSplit { walDirContents.add(status.getPath().getName()); } useDifferentDFSClient(); - WALSplitter.split(HBASEDIR, WALDIR, OLDLOGDIR, fs, conf, wals); + WALSplitter.split(HBASELOGDIR, WALDIR, OLDLOGDIR, fs, conf, wals); return walDirContents; } finally { conf.setClass("hbase.regionserver.hlog.reader.impl", backupClass, @@ -674,9 +674,9 @@ public class TestWALSplit { corruptWAL(c1, corruption, true); useDifferentDFSClient(); - WALSplitter.split(HBASEDIR, WALDIR, OLDLOGDIR, fs, conf, wals); + WALSplitter.split(HBASELOGDIR, WALDIR, OLDLOGDIR, fs, conf, wals); - Path[] splitLog = getLogForRegion(HBASEDIR, TABLE_NAME, REGION); + Path[] splitLog = getLogForRegion(TABLE_NAME, REGION); assertEquals(1, splitLog.length); int actualCount = 0; @@ -710,7 +710,7 @@ public class TestWALSplit { conf.setBoolean(HBASE_SKIP_ERRORS, false); generateWALs(-1); useDifferentDFSClient(); - WALSplitter.split(HBASEDIR, WALDIR, OLDLOGDIR, fs, conf, wals); + WALSplitter.split(HBASELOGDIR, WALDIR, OLDLOGDIR, fs, conf, wals); FileStatus[] archivedLogs = fs.listStatus(OLDLOGDIR); assertEquals("wrong number of files in the archive log", NUM_WRITERS, archivedLogs.length); } @@ -726,7 +726,7 @@ public class TestWALSplit { throws IOException { generateWALs(-1); useDifferentDFSClient(); - WALSplitter.split(HBASEDIR, WALDIR, OLDLOGDIR, fs, conf, wals); + WALSplitter.split(HBASELOGDIR, WALDIR, OLDLOGDIR, fs, conf, wals); FileStatus [] statuses = null; try { statuses = fs.listStatus(WALDIR); @@ -756,7 +756,7 @@ public class TestWALSplit { try { InstrumentedLogWriter.activateFailure = true; - WALSplitter.split(HBASEDIR, WALDIR, OLDLOGDIR, fs, conf, wals); + WALSplitter.split(HBASELOGDIR, WALDIR, OLDLOGDIR, fs, conf, wals); } catch (IOException e) { assertTrue(e.getMessage(). contains("This exception is instrumented and should only be thrown for testing")); @@ -777,7 +777,7 @@ public class TestWALSplit { Path regiondir = new Path(TABLEDIR, region); fs.delete(regiondir, true); - WALSplitter.split(HBASEDIR, WALDIR, OLDLOGDIR, fs, conf, wals); + WALSplitter.split(HBASELOGDIR, WALDIR, OLDLOGDIR, fs, conf, wals); assertFalse(fs.exists(regiondir)); } @@ -854,7 +854,7 @@ public class TestWALSplit { useDifferentDFSClient(); try { - WALSplitter.split(HBASEDIR, WALDIR, OLDLOGDIR, spiedFs, conf, wals); + WALSplitter.split(HBASELOGDIR, WALDIR, OLDLOGDIR, spiedFs, conf, wals); assertEquals(NUM_WRITERS, fs.listStatus(OLDLOGDIR).length); assertFalse(fs.exists(WALDIR)); } catch (IOException e) { @@ -1077,7 +1077,7 @@ public class TestWALSplit { Path regiondir = new Path(TABLEDIR, REGION); LOG.info("Region directory is" + regiondir); fs.delete(regiondir, true); - WALSplitter.split(HBASEDIR, WALDIR, OLDLOGDIR, fs, conf, wals); + WALSplitter.split(HBASELOGDIR, WALDIR, OLDLOGDIR, fs, conf, wals); assertFalse(fs.exists(regiondir)); } @@ -1090,7 +1090,7 @@ public class TestWALSplit { injectEmptyFile(".empty", true); useDifferentDFSClient(); - WALSplitter.split(HBASEDIR, WALDIR, OLDLOGDIR, fs, conf, wals); + WALSplitter.split(HBASELOGDIR, WALDIR, OLDLOGDIR, fs, conf, wals); Path tdir = FSUtils.getTableDir(HBASEDIR, TABLE_NAME); assertFalse(fs.exists(tdir)); @@ -1115,7 +1115,7 @@ public class TestWALSplit { Corruptions.INSERT_GARBAGE_ON_FIRST_LINE, true); useDifferentDFSClient(); - WALSplitter.split(HBASEDIR, WALDIR, OLDLOGDIR, fs, conf, wals); + WALSplitter.split(HBASELOGDIR, WALDIR, OLDLOGDIR, fs, conf, wals); final Path corruptDir = new Path(FSUtils.getWALRootDir(conf), HConstants.CORRUPT_DIR_NAME); assertEquals(1, fs.listStatus(corruptDir).length); @@ -1145,14 +1145,14 @@ public class TestWALSplit { @Override protected Writer createWriter(Path logfile) throws IOException { - Writer writer = wals.createRecoveredEditsWriter(this.fs, logfile); + Writer writer = wals.createRecoveredEditsWriter(this.walFS, logfile); // After creating writer, simulate region's // replayRecoveredEditsIfAny() which gets SplitEditFiles of this // region and delete them, excluding files with '.temp' suffix. NavigableSet<Path> files = WALSplitter.getSplitEditFilesSorted(fs, regiondir); if (files != null && !files.isEmpty()) { for (Path file : files) { - if (!this.fs.delete(file, false)) { + if (!this.walFS.delete(file, false)) { LOG.error("Failed delete of " + file); } else { LOG.debug("Deleted recovered.edits file=" + file); @@ -1231,9 +1231,9 @@ public class TestWALSplit { - private Path[] getLogForRegion(Path rootdir, TableName table, String region) + private Path[] getLogForRegion(TableName table, String region) throws IOException { - Path tdir = FSUtils.getTableDir(rootdir, table); + Path tdir = FSUtils.getWALTableDir(conf, table); @SuppressWarnings("deprecation") Path editsdir = WALSplitter.getRegionDirRecoveredEditsDir(HRegion.getRegionDir(tdir, Bytes.toString(region.getBytes())));