HDFS-11402. HDFS Snapshots should capture point-in-time copies of OPEN files. (Manoj Govindassamy via Yongjun Zhang)
Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/20e3ae26 Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/20e3ae26 Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/20e3ae26 Branch: refs/heads/YARN-2915 Commit: 20e3ae260b40cd6ef657b2a629a02219d68f162f Parents: 3721cfe Author: Yongjun Zhang <yzh...@cloudera.com> Authored: Fri Apr 21 20:19:20 2017 -0700 Committer: Yongjun Zhang <yzh...@cloudera.com> Committed: Fri Apr 21 20:35:48 2017 -0700 ---------------------------------------------------------------------- .../hdfs/client/HdfsClientConfigKeys.java | 4 + .../org/apache/hadoop/hdfs/DFSConfigKeys.java | 5 + .../hdfs/server/namenode/FSDirSnapshotOp.java | 7 +- .../hdfs/server/namenode/FSEditLogLoader.java | 5 +- .../hdfs/server/namenode/FSNamesystem.java | 2 +- .../hdfs/server/namenode/INodeDirectory.java | 8 +- .../hdfs/server/namenode/INodesInPath.java | 78 ++++- .../hdfs/server/namenode/LeaseManager.java | 103 ++++++- .../snapshot/DirectorySnapshottableFeature.java | 15 +- .../namenode/snapshot/SnapshotManager.java | 24 +- .../src/main/resources/hdfs-default.xml | 17 ++ .../hdfs/server/namenode/TestLeaseManager.java | 286 +++++++++++++++++- .../snapshot/TestOpenFilesWithSnapshot.java | 298 +++++++++++++++++++ .../snapshot/TestSnapshotDiffReport.java | 188 ++++++++++-- .../namenode/snapshot/TestSnapshotManager.java | 11 +- 15 files changed, 994 insertions(+), 57 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/hadoop/blob/20e3ae26/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/client/HdfsClientConfigKeys.java ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/client/HdfsClientConfigKeys.java b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/client/HdfsClientConfigKeys.java index c152a4b..fbc8d89 100644 --- a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/client/HdfsClientConfigKeys.java +++ b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/client/HdfsClientConfigKeys.java @@ -171,6 +171,10 @@ public interface HdfsClientConfigKeys { "dfs.data.transfer.client.tcpnodelay"; boolean DFS_DATA_TRANSFER_CLIENT_TCPNODELAY_DEFAULT = true; + String DFS_NAMENODE_SNAPSHOT_CAPTURE_OPENFILES = + "dfs.namenode.snapshot.capture.openfiles"; + boolean DFS_NAMENODE_SNAPSHOT_CAPTURE_OPENFILES_DEFAULT = false; + /** * These are deprecated config keys to client code. */ http://git-wip-us.apache.org/repos/asf/hadoop/blob/20e3ae26/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSConfigKeys.java ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSConfigKeys.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSConfigKeys.java index 6ff7e5a..3fa383b 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSConfigKeys.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSConfigKeys.java @@ -352,6 +352,11 @@ public class DFSConfigKeys extends CommonConfigurationKeys { public static final String DFS_NAMENODE_STARTUP_DELAY_BLOCK_DELETION_SEC_KEY = "dfs.namenode.startup.delay.block.deletion.sec"; public static final long DFS_NAMENODE_STARTUP_DELAY_BLOCK_DELETION_SEC_DEFAULT = 0L; + public static final String DFS_NAMENODE_SNAPSHOT_CAPTURE_OPENFILES = + HdfsClientConfigKeys.DFS_NAMENODE_SNAPSHOT_CAPTURE_OPENFILES; + public static final boolean DFS_NAMENODE_SNAPSHOT_CAPTURE_OPENFILES_DEFAULT = + HdfsClientConfigKeys.DFS_NAMENODE_SNAPSHOT_CAPTURE_OPENFILES_DEFAULT; + // Whether to enable datanode's stale state detection and usage for reads public static final String DFS_NAMENODE_AVOID_STALE_DATANODE_FOR_READ_KEY = "dfs.namenode.avoid.read.stale.datanode"; public static final boolean DFS_NAMENODE_AVOID_STALE_DATANODE_FOR_READ_DEFAULT = false; http://git-wip-us.apache.org/repos/asf/hadoop/blob/20e3ae26/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSDirSnapshotOp.java ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSDirSnapshotOp.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSDirSnapshotOp.java index ff076e4..9dd75bc 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSDirSnapshotOp.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSDirSnapshotOp.java @@ -97,12 +97,13 @@ class FSDirSnapshotOp { throw new InvalidPathException("Invalid snapshot name: " + snapshotName); } - String snapshotPath = null; + String snapshotPath; verifySnapshotName(fsd, snapshotName, snapshotRoot); fsd.writeLock(); try { - snapshotPath = snapshotManager.createSnapshot(iip, snapshotRoot, - snapshotName); + snapshotPath = snapshotManager.createSnapshot( + fsd.getFSNamesystem().getLeaseManager(), + iip, snapshotRoot, snapshotName); } finally { fsd.writeUnlock(); } http://git-wip-us.apache.org/repos/asf/hadoop/blob/20e3ae26/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSEditLogLoader.java ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSEditLogLoader.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSEditLogLoader.java index ff36f18..ae0b304 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSEditLogLoader.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSEditLogLoader.java @@ -746,8 +746,9 @@ public class FSEditLogLoader { renameReservedPathsOnUpgrade(createSnapshotOp.snapshotRoot, logVersion); INodesInPath iip = fsDir.getINodesInPath(snapshotRoot, DirOp.WRITE); - String path = fsNamesys.getSnapshotManager().createSnapshot(iip, - snapshotRoot, createSnapshotOp.snapshotName); + String path = fsNamesys.getSnapshotManager().createSnapshot( + fsDir.getFSNamesystem().getLeaseManager(), + iip, snapshotRoot, createSnapshotOp.snapshotName); if (toAddRetryCache) { fsNamesys.addCacheEntryWithPayload(createSnapshotOp.rpcClientId, createSnapshotOp.rpcCallId, path); http://git-wip-us.apache.org/repos/asf/hadoop/blob/20e3ae26/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSNamesystem.java ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSNamesystem.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSNamesystem.java index e24778f..3dbfdf9 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSNamesystem.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSNamesystem.java @@ -844,7 +844,7 @@ public class FSNamesystem implements Namesystem, FSNamesystemMBean, this.dtSecretManager = createDelegationTokenSecretManager(conf); this.dir = new FSDirectory(this, conf); - this.snapshotManager = new SnapshotManager(dir); + this.snapshotManager = new SnapshotManager(conf, dir); this.cacheManager = new CacheManager(this, conf, blockManager); this.ecPolicyManager = new ErasureCodingPolicyManager(conf); this.topConf = new TopConf(conf); http://git-wip-us.apache.org/repos/asf/hadoop/blob/20e3ae26/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/INodeDirectory.java ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/INodeDirectory.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/INodeDirectory.java index b6e2713..a4098bc 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/INodeDirectory.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/INodeDirectory.java @@ -256,9 +256,11 @@ public class INodeDirectory extends INodeWithAdditionalFields getDirectorySnapshottableFeature().setSnapshotQuota(snapshotQuota); } - public Snapshot addSnapshot(int id, String name) throws SnapshotException, - QuotaExceededException { - return getDirectorySnapshottableFeature().addSnapshot(this, id, name); + public Snapshot addSnapshot(int id, String name, + final LeaseManager leaseManager, final boolean captureOpenFiles) + throws SnapshotException, QuotaExceededException { + return getDirectorySnapshottableFeature().addSnapshot(this, id, name, + leaseManager, captureOpenFiles); } public Snapshot removeSnapshot( http://git-wip-us.apache.org/repos/asf/hadoop/blob/20e3ae26/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/INodesInPath.java ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/INodesInPath.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/INodesInPath.java index 1d5dbf6..abc8b63 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/INodesInPath.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/INodesInPath.java @@ -49,24 +49,62 @@ public class INodesInPath { Arrays.equals(HdfsServerConstants.DOT_SNAPSHOT_DIR_BYTES, pathComponent); } - static INodesInPath fromINode(INode inode) { + private static INode[] getINodes(final INode inode) { int depth = 0, index; INode tmp = inode; while (tmp != null) { depth++; tmp = tmp.getParent(); } - final byte[][] path = new byte[depth][]; - final INode[] inodes = new INode[depth]; + INode[] inodes = new INode[depth]; tmp = inode; index = depth; while (tmp != null) { index--; - path[index] = tmp.getKey(); inodes[index] = tmp; tmp = tmp.getParent(); } - return new INodesInPath(inodes, path); + return inodes; + } + + private static byte[][] getPaths(final INode[] inodes) { + byte[][] paths = new byte[inodes.length][]; + for (int i = 0; i < inodes.length; i++) { + paths[i] = inodes[i].getKey(); + } + return paths; + } + + /** + * Construct {@link INodesInPath} from {@link INode}. + * + * @param inode to construct from + * @return INodesInPath + */ + static INodesInPath fromINode(INode inode) { + INode[] inodes = getINodes(inode); + byte[][] paths = getPaths(inodes); + return new INodesInPath(inodes, paths); + } + + /** + * Construct {@link INodesInPath} from {@link INode} and its root + * {@link INodeDirectory}. INodesInPath constructed this way will + * each have its snapshot and latest snapshot id filled in. + * + * This routine is specifically for + * {@link LeaseManager#getINodeWithLeases(INodeDirectory)} to get + * open files along with their snapshot details which is used during + * new snapshot creation to capture their meta data. + * + * @param rootDir the root {@link INodeDirectory} under which inode + * needs to be resolved + * @param inode the {@link INode} to be resolved + * @return INodesInPath + */ + static INodesInPath fromINode(final INodeDirectory rootDir, INode inode) { + byte[][] paths = getPaths(getINodes(inode)); + return resolve(rootDir, paths); } static INodesInPath fromComponents(byte[][] components) { @@ -382,6 +420,36 @@ public class INodesInPath { } /** + * Verify if this {@link INodesInPath} is a descendant of the + * requested {@link INodeDirectory}. + * + * @param inodeDirectory the ancestor directory + * @return true if this INodesInPath is a descendant of inodeDirectory + */ + public boolean isDescendant(final INodeDirectory inodeDirectory) { + final INodesInPath dirIIP = fromINode(inodeDirectory); + return isDescendant(dirIIP); + } + + private boolean isDescendant(final INodesInPath ancestorDirIIP) { + int ancestorDirINodesLength = ancestorDirIIP.length(); + int myParentINodesLength = length() - 1; + if (myParentINodesLength < ancestorDirINodesLength) { + return false; + } + + int index = 0; + while (index < ancestorDirINodesLength) { + if (inodes[index] != ancestorDirIIP.getINode(index)) { + return false; + } + index++; + } + return true; + } + + + /** * @return a new INodesInPath instance that only contains existing INodes. * Note that this method only handles non-snapshot paths. */ http://git-wip-us.apache.org/repos/asf/hadoop/blob/20e3ae26/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/LeaseManager.java ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/LeaseManager.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/LeaseManager.java index a9fb24b..f78eef3 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/LeaseManager.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/LeaseManager.java @@ -28,9 +28,15 @@ import java.util.HashMap; import java.util.HashSet; import java.util.List; import java.util.PriorityQueue; +import java.util.Set; import java.util.SortedMap; import java.util.TreeMap; +import java.util.concurrent.Callable; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.concurrent.Future; +import com.google.common.collect.Lists; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.apache.hadoop.classification.InterfaceAudience; @@ -41,6 +47,7 @@ import org.apache.hadoop.util.Daemon; import com.google.common.annotations.VisibleForTesting; import com.google.common.base.Preconditions; +import org.apache.hadoop.util.Time; /** * LeaseManager does the lease housekeeping for writing on files. @@ -67,16 +74,14 @@ import com.google.common.base.Preconditions; @InterfaceAudience.Private public class LeaseManager { public static final Log LOG = LogFactory.getLog(LeaseManager.class); - private final FSNamesystem fsnamesystem; - private long softLimit = HdfsConstants.LEASE_SOFTLIMIT_PERIOD; private long hardLimit = HdfsConstants.LEASE_HARDLIMIT_PERIOD; + static final int INODE_FILTER_WORKER_COUNT_MAX = 4; + static final int INODE_FILTER_WORKER_TASK_MIN = 512; - // // Used for handling lock-leases // Mapping: leaseHolder -> Lease - // private final SortedMap<String, Lease> leases = new TreeMap<>(); // Set of: Lease private final PriorityQueue<Lease> sortedLeases = new PriorityQueue<>(512, @@ -129,6 +134,96 @@ public class LeaseManager { Collection<Long> getINodeIdWithLeases() {return leasesById.keySet();} + /** + * Get {@link INodesInPath} for all {@link INode} in the system + * which has a valid lease. + * + * @return Set<INodesInPath> + */ + public Set<INodesInPath> getINodeWithLeases() { + return getINodeWithLeases(null); + } + + private synchronized INode[] getINodesWithLease() { + int inodeCount = 0; + INode[] inodes = new INode[leasesById.size()]; + for (long inodeId : leasesById.keySet()) { + inodes[inodeCount] = fsnamesystem.getFSDirectory().getInode(inodeId); + inodeCount++; + } + return inodes; + } + + /** + * Get {@link INodesInPath} for all files under the ancestor directory which + * has valid lease. If the ancestor directory is null, then return all files + * in the system with valid lease. Callers must hold {@link FSNamesystem} + * read or write lock. + * + * @param ancestorDir the ancestor {@link INodeDirectory} + * @return Set<INodesInPath> + */ + public Set<INodesInPath> getINodeWithLeases(final INodeDirectory + ancestorDir) { + assert fsnamesystem.hasReadLock(); + final long startTimeMs = Time.monotonicNow(); + Set<INodesInPath> iipSet = new HashSet<>(); + final INode[] inodes = getINodesWithLease(); + int inodeCount = inodes.length; + if (inodeCount == 0) { + return iipSet; + } + + List<Future<List<INodesInPath>>> futureList = Lists.newArrayList(); + final int workerCount = Math.min(INODE_FILTER_WORKER_COUNT_MAX, + (((inodeCount - 1) / INODE_FILTER_WORKER_TASK_MIN) + 1)); + ExecutorService inodeFilterService = + Executors.newFixedThreadPool(workerCount); + for (int workerIdx = 0; workerIdx < workerCount; workerIdx++) { + final int startIdx = workerIdx; + Callable<List<INodesInPath>> c = new Callable<List<INodesInPath>>() { + @Override + public List<INodesInPath> call() { + List<INodesInPath> iNodesInPaths = Lists.newArrayList(); + for (int idx = startIdx; idx < inodeCount; idx += workerCount) { + INode inode = inodes[idx]; + if (!inode.isFile()) { + continue; + } + INodesInPath inodesInPath = INodesInPath.fromINode( + fsnamesystem.getFSDirectory().getRoot(), inode.asFile()); + if (ancestorDir != null && + !inodesInPath.isDescendant(ancestorDir)) { + continue; + } + iNodesInPaths.add(inodesInPath); + } + return iNodesInPaths; + } + }; + + // Submit the inode filter task to the Executor Service + futureList.add(inodeFilterService.submit(c)); + } + inodeFilterService.shutdown(); + + for (Future<List<INodesInPath>> f : futureList) { + try { + iipSet.addAll(f.get()); + } catch (Exception e) { + LOG.warn("INode filter task encountered exception: ", e); + } + } + final long endTimeMs = Time.monotonicNow(); + if ((endTimeMs - startTimeMs) > 1000) { + LOG.info("Took " + (endTimeMs - startTimeMs) + " ms to collect " + + iipSet.size() + " open files with leases" + + ((ancestorDir != null) ? + " under " + ancestorDir.getFullPathName() : ".")); + } + return iipSet; + } + /** @return the lease containing src */ public synchronized Lease getLease(INodeFile src) {return leasesById.get(src.getId());} http://git-wip-us.apache.org/repos/asf/hadoop/blob/20e3ae26/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/snapshot/DirectorySnapshottableFeature.java ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/snapshot/DirectorySnapshottableFeature.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/snapshot/DirectorySnapshottableFeature.java index fa7bace..3039ad3 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/snapshot/DirectorySnapshottableFeature.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/snapshot/DirectorySnapshottableFeature.java @@ -23,6 +23,7 @@ import java.util.Collections; import java.util.Iterator; import java.util.LinkedList; import java.util.List; +import java.util.Set; import org.apache.hadoop.HadoopIllegalArgumentException; import org.apache.hadoop.classification.InterfaceAudience; @@ -39,6 +40,8 @@ import org.apache.hadoop.hdfs.server.namenode.INodeFile; import org.apache.hadoop.hdfs.server.namenode.INodeReference; import org.apache.hadoop.hdfs.server.namenode.INodeReference.WithCount; import org.apache.hadoop.hdfs.server.namenode.INodeReference.WithName; +import org.apache.hadoop.hdfs.server.namenode.INodesInPath; +import org.apache.hadoop.hdfs.server.namenode.LeaseManager; import org.apache.hadoop.hdfs.util.Diff.ListType; import org.apache.hadoop.hdfs.util.ReadOnlyList; import org.apache.hadoop.util.Time; @@ -163,7 +166,8 @@ public class DirectorySnapshottableFeature extends DirectoryWithSnapshotFeature } /** Add a snapshot. */ - public Snapshot addSnapshot(INodeDirectory snapshotRoot, int id, String name) + public Snapshot addSnapshot(INodeDirectory snapshotRoot, int id, String name, + final LeaseManager leaseManager, final boolean captureOpenFiles) throws SnapshotException, QuotaExceededException { //check snapshot quota final int n = getNumSnapshots(); @@ -188,6 +192,15 @@ public class DirectorySnapshottableFeature extends DirectoryWithSnapshotFeature final long now = Time.now(); snapshotRoot.updateModificationTime(now, Snapshot.CURRENT_STATE_ID); s.getRoot().setModificationTime(now, Snapshot.CURRENT_STATE_ID); + + if (captureOpenFiles) { + Set<INodesInPath> openFilesIIP = + leaseManager.getINodeWithLeases(snapshotRoot); + for (INodesInPath openFileIIP : openFilesIIP) { + INodeFile openFile = openFileIIP.getLastINode().asFile(); + openFile.recordModification(openFileIIP.getLatestSnapshotId()); + } + } return s; } http://git-wip-us.apache.org/repos/asf/hadoop/blob/20e3ae26/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/snapshot/SnapshotManager.java ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/snapshot/SnapshotManager.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/snapshot/SnapshotManager.java index 8ad7824..ffc203f 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/snapshot/SnapshotManager.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/snapshot/SnapshotManager.java @@ -17,6 +17,9 @@ */ package org.apache.hadoop.hdfs.server.namenode.snapshot; +import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_NAMENODE_SNAPSHOT_CAPTURE_OPENFILES; +import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_NAMENODE_SNAPSHOT_CAPTURE_OPENFILES_DEFAULT; + import java.io.DataInput; import java.io.DataOutput; import java.io.IOException; @@ -29,6 +32,7 @@ import java.util.concurrent.atomic.AtomicInteger; import javax.management.ObjectName; +import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.hdfs.DFSUtil; import org.apache.hadoop.hdfs.DFSUtilClient; import org.apache.hadoop.hdfs.protocol.SnapshotDiffReport; @@ -43,6 +47,7 @@ import org.apache.hadoop.hdfs.server.namenode.FSNamesystem; import org.apache.hadoop.hdfs.server.namenode.INode; import org.apache.hadoop.hdfs.server.namenode.INodeDirectory; import org.apache.hadoop.hdfs.server.namenode.INodesInPath; +import org.apache.hadoop.hdfs.server.namenode.LeaseManager; import org.apache.hadoop.metrics2.util.MBeans; import com.google.common.base.Preconditions; @@ -60,20 +65,23 @@ import com.google.common.base.Preconditions; * if necessary. */ public class SnapshotManager implements SnapshotStatsMXBean { - private boolean allowNestedSnapshots = false; private final FSDirectory fsdir; - private static final int SNAPSHOT_ID_BIT_WIDTH = 24; - + private final boolean captureOpenFiles; private final AtomicInteger numSnapshots = new AtomicInteger(); + private static final int SNAPSHOT_ID_BIT_WIDTH = 24; + private boolean allowNestedSnapshots = false; private int snapshotCounter = 0; /** All snapshottable directories in the namesystem. */ private final Map<Long, INodeDirectory> snapshottables = new HashMap<Long, INodeDirectory>(); - public SnapshotManager(final FSDirectory fsdir) { + public SnapshotManager(final Configuration conf, final FSDirectory fsdir) { this.fsdir = fsdir; + this.captureOpenFiles = conf.getBoolean( + DFS_NAMENODE_SNAPSHOT_CAPTURE_OPENFILES, + DFS_NAMENODE_SNAPSHOT_CAPTURE_OPENFILES_DEFAULT); } /** Used in tests only */ @@ -203,8 +211,9 @@ public class SnapshotManager implements SnapshotStatsMXBean { * snapshot with the given name for the directory, and/or 3) * snapshot number exceeds quota */ - public String createSnapshot(final INodesInPath iip, String snapshotRoot, - String snapshotName) throws IOException { + public String createSnapshot(final LeaseManager leaseManager, + final INodesInPath iip, String snapshotRoot, String snapshotName) + throws IOException { INodeDirectory srcRoot = getSnapshottableRoot(iip); if (snapshotCounter == getMaxSnapshotID()) { @@ -216,7 +225,8 @@ public class SnapshotManager implements SnapshotStatsMXBean { "snapshot IDs and ID rollover is not supported."); } - srcRoot.addSnapshot(snapshotCounter, snapshotName); + srcRoot.addSnapshot(snapshotCounter, snapshotName, leaseManager, + this.captureOpenFiles); //create success, update id snapshotCounter++; http://git-wip-us.apache.org/repos/asf/hadoop/blob/20e3ae26/hadoop-hdfs-project/hadoop-hdfs/src/main/resources/hdfs-default.xml ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/resources/hdfs-default.xml b/hadoop-hdfs-project/hadoop-hdfs/src/main/resources/hdfs-default.xml index 5365cac..7fcea01 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/resources/hdfs-default.xml +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/resources/hdfs-default.xml @@ -4154,6 +4154,23 @@ </description> </property> + <property> + <name>dfs.namenode.snapshot.capture.openfiles</name> + <value>false</value> + <description> + If true, snapshots taken will have an immutable shared copy of + the open files that have valid leases. Even after the open files + grow or shrink in size, snapshot will always have the previous + point-in-time version of the open files, just like all other + closed files. Default is false. + Note: The file length captured for open files in snapshot is + whats recorded in NameNode at the time of snapshot and it may + be shorter than what the client has written till then. In order + to capture the latest length, the client can call hflush/hsync + with the flag SyncFlag.UPDATE_LENGTH on the open files handles. + </description> + </property> + <property> <name>dfs.pipeline.ecn</name> <value>false</value> http://git-wip-us.apache.org/repos/asf/hadoop/blob/20e3ae26/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestLeaseManager.java ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestLeaseManager.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestLeaseManager.java index 6692090..9adb071 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestLeaseManager.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestLeaseManager.java @@ -20,24 +20,33 @@ package org.apache.hadoop.hdfs.server.namenode; import static org.hamcrest.CoreMatchers.is; import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertTrue; -import static org.junit.Assert.assertNotNull; -import static org.junit.Assert.assertNull; import com.google.common.collect.Lists; import org.apache.hadoop.fs.FSDataOutputStream; import org.apache.hadoop.fs.Path; import org.apache.hadoop.fs.permission.FsPermission; import org.apache.hadoop.fs.permission.PermissionStatus; +import org.apache.hadoop.hdfs.DFSUtil; import org.apache.hadoop.hdfs.DistributedFileSystem; import org.apache.hadoop.hdfs.HdfsConfiguration; import org.apache.hadoop.hdfs.MiniDFSCluster; +import org.apache.hadoop.hdfs.protocol.HdfsConstants; import org.apache.hadoop.hdfs.protocol.HdfsConstants.SafeModeAction; +import org.apache.hadoop.hdfs.protocol.QuotaExceededException; import org.apache.hadoop.hdfs.server.blockmanagement.BlockInfo; +import org.apache.hadoop.hdfs.server.namenode.snapshot.Snapshot; import org.junit.Rule; import org.junit.Test; import org.junit.rules.Timeout; import java.util.ArrayList; +import java.util.Arrays; +import java.util.HashMap; +import java.util.HashSet; +import java.util.Map; +import java.util.Map.Entry; +import java.util.Set; +import java.util.concurrent.atomic.AtomicInteger; import static org.junit.Assert.assertThat; import static org.mockito.Mockito.*; @@ -69,19 +78,21 @@ public class TestLeaseManager { /** Check that LeaseManager.checkLease release some leases */ @Test - public void testCheckLease() { + public void testCheckLease() throws InterruptedException { LeaseManager lm = new LeaseManager(makeMockFsNameSystem()); - - long numLease = 100; + final long numLease = 100; + final long expiryTime = 0; + final long waitTime = expiryTime + 1; //Make sure the leases we are going to add exceed the hard limit - lm.setLeasePeriod(0, 0); + lm.setLeasePeriod(expiryTime, expiryTime); for (long i = 0; i <= numLease - 1; i++) { //Add some leases to the LeaseManager lm.addLease("holder"+i, INodeId.ROOT_INODE_ID + i); } assertEquals(numLease, lm.countLease()); + Thread.sleep(waitTime); //Initiate a call to checkLease. This should exit within the test timeout lm.checkLeases(); @@ -156,10 +167,271 @@ public class TestLeaseManager { } } + /** + * Test leased files counts from + * {@link LeaseManager#getINodeWithLeases()}, + * {@link LeaseManager#getINodeIdWithLeases()} and + * {@link LeaseManager#getINodeWithLeases(INodeDirectory)}. + */ + @Test (timeout = 60000) + public void testInodeWithLeases() throws Exception { + FSNamesystem fsNamesystem = makeMockFsNameSystem(); + FSDirectory fsDirectory = fsNamesystem.getFSDirectory(); + LeaseManager lm = new LeaseManager(fsNamesystem); + Set<Long> iNodeIds = new HashSet<>(Arrays.asList( + INodeId.ROOT_INODE_ID + 1, + INodeId.ROOT_INODE_ID + 2, + INodeId.ROOT_INODE_ID + 3, + INodeId.ROOT_INODE_ID + 4 + )); + final PermissionStatus perm = PermissionStatus.createImmutable( + "user", "group", FsPermission.createImmutable((short)0755)); + INodeDirectory rootInodeDirectory = new INodeDirectory( + HdfsConstants.GRANDFATHER_INODE_ID, DFSUtil.string2Bytes(""), + perm, 0L); + when(fsDirectory.getRoot()).thenReturn(rootInodeDirectory); + verifyINodeLeaseCounts(lm, rootInodeDirectory, 0, 0, 0); + + for (Long iNodeId : iNodeIds) { + INodeFile iNodeFile = stubInodeFile(iNodeId); + iNodeFile.setParent(rootInodeDirectory); + when(fsDirectory.getInode(iNodeId)).thenReturn(iNodeFile); + lm.addLease("holder_" + iNodeId, iNodeId); + } + verifyINodeLeaseCounts(lm, rootInodeDirectory, iNodeIds.size(), + iNodeIds.size(), iNodeIds.size()); + + for (Long iNodeId : iNodeIds) { + lm.removeLease(iNodeId); + } + verifyINodeLeaseCounts(lm, rootInodeDirectory, 0, 0, 0); + } + + /** + * Test leased files counts at various scale from + * {@link LeaseManager#getINodeWithLeases()}, + * {@link LeaseManager#getINodeIdWithLeases()} and + * {@link LeaseManager#getINodeWithLeases(INodeDirectory)}. + */ + @Test (timeout = 240000) + public void testInodeWithLeasesAtScale() throws Exception { + FSNamesystem fsNamesystem = makeMockFsNameSystem(); + FSDirectory fsDirectory = fsNamesystem.getFSDirectory(); + LeaseManager lm = new LeaseManager(fsNamesystem); + + final PermissionStatus perm = PermissionStatus.createImmutable( + "user", "group", FsPermission.createImmutable((short)0755)); + INodeDirectory rootInodeDirectory = new INodeDirectory( + HdfsConstants.GRANDFATHER_INODE_ID, DFSUtil.string2Bytes(""), + perm, 0L); + when(fsDirectory.getRoot()).thenReturn(rootInodeDirectory); + + // Case 1: No open files + int scale = 0; + testInodeWithLeasesAtScaleImpl(lm, fsDirectory, rootInodeDirectory, scale); + + for (int workerCount = 1; + workerCount <= LeaseManager.INODE_FILTER_WORKER_COUNT_MAX / 2; + workerCount++) { + // Case 2: Open files count is half of worker task size + scale = workerCount * LeaseManager.INODE_FILTER_WORKER_TASK_MIN / 2; + testInodeWithLeasesAtScaleImpl(lm, fsDirectory, + rootInodeDirectory, scale); + + // Case 3: Open files count is 1 less of worker task size + scale = workerCount * LeaseManager.INODE_FILTER_WORKER_TASK_MIN - 1; + testInodeWithLeasesAtScaleImpl(lm, fsDirectory, + rootInodeDirectory, scale); + + // Case 4: Open files count is equal to worker task size + scale = workerCount * LeaseManager.INODE_FILTER_WORKER_TASK_MIN; + testInodeWithLeasesAtScaleImpl(lm, fsDirectory, + rootInodeDirectory, scale); + + // Case 5: Open files count is 1 more than worker task size + scale = workerCount * LeaseManager.INODE_FILTER_WORKER_TASK_MIN + 1; + testInodeWithLeasesAtScaleImpl(lm, fsDirectory, + rootInodeDirectory, scale); + } + + // Case 6: Open files count is way more than worker count + scale = 1279; + testInodeWithLeasesAtScaleImpl(lm, fsDirectory, rootInodeDirectory, scale); + } + + private void testInodeWithLeasesAtScaleImpl(final LeaseManager leaseManager, + final FSDirectory fsDirectory, INodeDirectory ancestorDirectory, + int scale) { + verifyINodeLeaseCounts(leaseManager, ancestorDirectory, 0, 0, 0); + + Set<Long> iNodeIds = new HashSet<>(); + for (int i = 0; i < scale; i++) { + iNodeIds.add(INodeId.ROOT_INODE_ID + i); + } + for (Long iNodeId : iNodeIds) { + INodeFile iNodeFile = stubInodeFile(iNodeId); + iNodeFile.setParent(ancestorDirectory); + when(fsDirectory.getInode(iNodeId)).thenReturn(iNodeFile); + leaseManager.addLease("holder_" + iNodeId, iNodeId); + } + verifyINodeLeaseCounts(leaseManager, ancestorDirectory, iNodeIds.size(), + iNodeIds.size(), iNodeIds.size()); + + leaseManager.removeAllLeases(); + verifyINodeLeaseCounts(leaseManager, ancestorDirectory, 0, 0, 0); + } + + /** + * Verify leased INode details across lease get and release from + * {@link LeaseManager#getINodeIdWithLeases()} and + * {@link LeaseManager#getINodeWithLeases(INodeDirectory)}. + */ + @Test (timeout = 60000) + public void testInodeWithLeasesForAncestorDir() throws Exception { + FSNamesystem fsNamesystem = makeMockFsNameSystem(); + FSDirectory fsDirectory = fsNamesystem.getFSDirectory(); + LeaseManager lm = new LeaseManager(fsNamesystem); + + final PermissionStatus perm = PermissionStatus.createImmutable( + "user", "group", FsPermission.createImmutable((short)0755)); + INodeDirectory rootInodeDirectory = new INodeDirectory( + HdfsConstants.GRANDFATHER_INODE_ID, DFSUtil.string2Bytes(""), + perm, 0L); + when(fsDirectory.getRoot()).thenReturn(rootInodeDirectory); + + AtomicInteger inodeIds = new AtomicInteger( + (int) (HdfsConstants.GRANDFATHER_INODE_ID + 1234)); + String[] pathTree = new String[] { + "/root.log", + "/ENG/a/a1.log", + "/ENG/a/b/b1.log", + "/ENG/a/b/c/c1.log", + "/ENG/a/b/c/c2.log", + "/OPS/m/m1.log", + "/OPS/m/n/n1.log", + "/OPS/m/n/n2.log" + }; + Map<String, INode> pathINodeMap = createINodeTree(rootInodeDirectory, + pathTree, inodeIds); + + assertEquals(0, lm.getINodeIdWithLeases().size()); + for (Entry<String, INode> entry : pathINodeMap.entrySet()) { + long iNodeId = entry.getValue().getId(); + when(fsDirectory.getInode(iNodeId)).thenReturn(entry.getValue()); + if (entry.getKey().contains("log")) { + lm.addLease("holder_" + iNodeId, iNodeId); + } + } + assertEquals(pathTree.length, lm.getINodeIdWithLeases().size()); + assertEquals(pathTree.length, lm.getINodeWithLeases().size()); + assertEquals(pathTree.length, lm.getINodeWithLeases( + rootInodeDirectory).size()); + + // reset + lm.removeAllLeases(); + + Set<String> filesLeased = new HashSet<>( + Arrays.asList("root.log", "a1.log", "c1.log", "n2.log")); + for (String fileName : filesLeased) { + lm.addLease("holder", pathINodeMap.get(fileName).getId()); + } + assertEquals(filesLeased.size(), lm.getINodeIdWithLeases().size()); + assertEquals(filesLeased.size(), lm.getINodeWithLeases().size()); + Set<INodesInPath> iNodeWithLeases = lm.getINodeWithLeases(); + for (INodesInPath iNodesInPath : iNodeWithLeases) { + String leasedFileName = DFSUtil.bytes2String( + iNodesInPath.getLastLocalName()); + assertTrue(filesLeased.contains(leasedFileName)); + } + + assertEquals(filesLeased.size(), + lm.getINodeWithLeases(rootInodeDirectory).size()); + assertEquals(filesLeased.size() - 2, + lm.getINodeWithLeases(pathINodeMap.get("ENG").asDirectory()).size()); + assertEquals(filesLeased.size() - 2, + lm.getINodeWithLeases(pathINodeMap.get("a").asDirectory()).size()); + assertEquals(filesLeased.size() - 3, + lm.getINodeWithLeases(pathINodeMap.get("c").asDirectory()).size()); + assertEquals(filesLeased.size() - 3, + lm.getINodeWithLeases(pathINodeMap.get("OPS").asDirectory()).size()); + assertEquals(filesLeased.size() - 3, + lm.getINodeWithLeases(pathINodeMap.get("n").asDirectory()).size()); + + lm.removeLease(pathINodeMap.get("n2.log").getId()); + assertEquals(filesLeased.size() - 1, + lm.getINodeWithLeases(rootInodeDirectory).size()); + assertEquals(filesLeased.size() - 4, + lm.getINodeWithLeases(pathINodeMap.get("n").asDirectory()).size()); + + lm.removeAllLeases(); + filesLeased.clear(); + assertEquals(filesLeased.size(), + lm.getINodeWithLeases(rootInodeDirectory).size()); + + } + + private void verifyINodeLeaseCounts(final LeaseManager leaseManager, + INodeDirectory ancestorDirectory, int iNodeIdWithLeaseCount, + int iNodeWithLeaseCount, int iNodeUnderAncestorLeaseCount) { + assertEquals(iNodeIdWithLeaseCount, + leaseManager.getINodeIdWithLeases().size()); + assertEquals(iNodeWithLeaseCount, + leaseManager.getINodeWithLeases().size()); + assertEquals(iNodeUnderAncestorLeaseCount, + leaseManager.getINodeWithLeases(ancestorDirectory).size()); + } + + private Map<String, INode> createINodeTree(INodeDirectory parentDir, + String[] pathTree, AtomicInteger inodeId) + throws QuotaExceededException { + HashMap<String, INode> pathINodeMap = new HashMap<>(); + for (String path : pathTree) { + byte[][] components = INode.getPathComponents(path); + FsPermission perm = FsPermission.createImmutable((short) 0755); + PermissionStatus permStatus = + PermissionStatus.createImmutable("", "", perm); + + INodeDirectory prev = parentDir; + INodeDirectory dir = null; + for (int i = 0; i < components.length - 1; i++) { + byte[] component = components[i]; + if (component.length == 0) { + continue; + } + INode existingChild = prev.getChild( + component, Snapshot.CURRENT_STATE_ID); + if (existingChild == null) { + String dirName = DFSUtil.bytes2String(component); + dir = new INodeDirectory(inodeId.incrementAndGet(), component, + permStatus, 0); + prev.addChild(dir, false, Snapshot.CURRENT_STATE_ID); + pathINodeMap.put(dirName, dir); + prev = dir; + } else { + assertTrue(existingChild.isDirectory()); + prev = existingChild.asDirectory(); + } + } + + PermissionStatus p = new PermissionStatus( + "user", "group", new FsPermission((short) 0777)); + byte[] fileNameBytes = components[components.length - 1]; + String fileName = DFSUtil.bytes2String(fileNameBytes); + INodeFile iNodeFile = new INodeFile( + inodeId.incrementAndGet(), fileNameBytes, + p, 0L, 0L, BlockInfo.EMPTY_ARRAY, (short) 1, 1L); + iNodeFile.setParent(prev); + pathINodeMap.put(fileName, iNodeFile); + } + return pathINodeMap; + } + + private static FSNamesystem makeMockFsNameSystem() { FSDirectory dir = mock(FSDirectory.class); FSNamesystem fsn = mock(FSNamesystem.class); when(fsn.isRunning()).thenReturn(true); + when(fsn.hasReadLock()).thenReturn(true); when(fsn.hasWriteLock()).thenReturn(true); when(fsn.getFSDirectory()).thenReturn(dir); when(fsn.getMaxLockHoldToReleaseLeaseMs()).thenReturn(maxLockHoldToReleaseLeaseMs); @@ -170,7 +442,7 @@ public class TestLeaseManager { PermissionStatus p = new PermissionStatus( "dummy", "dummy", new FsPermission((short) 0777)); return new INodeFile( - inodeId, "/foo".getBytes(), p, 0L, 0L, + inodeId, new String("foo-" + inodeId).getBytes(), p, 0L, 0L, BlockInfo.EMPTY_ARRAY, (short) 1, 1L); } } http://git-wip-us.apache.org/repos/asf/hadoop/blob/20e3ae26/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/snapshot/TestOpenFilesWithSnapshot.java ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/snapshot/TestOpenFilesWithSnapshot.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/snapshot/TestOpenFilesWithSnapshot.java index 7b7c34a..7aaadf8 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/snapshot/TestOpenFilesWithSnapshot.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/snapshot/TestOpenFilesWithSnapshot.java @@ -18,21 +18,28 @@ package org.apache.hadoop.hdfs.server.namenode.snapshot; import java.io.IOException; +import java.util.Arrays; import java.util.EnumSet; +import java.util.HashSet; +import java.util.Random; +import java.util.Set; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FSDataOutputStream; import org.apache.hadoop.fs.Path; +import org.apache.hadoop.hdfs.DFSConfigKeys; import org.apache.hadoop.hdfs.DFSOutputStream; import org.apache.hadoop.hdfs.DFSTestUtil; import org.apache.hadoop.hdfs.DistributedFileSystem; import org.apache.hadoop.hdfs.MiniDFSCluster; +import org.apache.hadoop.hdfs.client.HdfsDataOutputStream; import org.apache.hadoop.hdfs.client.HdfsDataOutputStream.SyncFlag; import org.apache.hadoop.hdfs.protocol.HdfsConstants; import org.apache.hadoop.hdfs.server.namenode.NameNode; import org.apache.hadoop.hdfs.server.namenode.NameNodeAdapter; import org.apache.hadoop.hdfs.server.protocol.NamenodeProtocols; import org.junit.After; +import org.junit.Assert; import org.junit.Before; import org.junit.Test; @@ -41,8 +48,16 @@ public class TestOpenFilesWithSnapshot { MiniDFSCluster cluster = null; DistributedFileSystem fs = null; + private static final long SEED = 0; + private static final short REPLICATION = 3; + private static final long BLOCKSIZE = 1024; + private static final long BUFFERLEN = BLOCKSIZE / 2; + private static final long FILELEN = BLOCKSIZE * 2; + @Before public void setup() throws IOException { + conf.setBoolean( + DFSConfigKeys.DFS_NAMENODE_SNAPSHOT_CAPTURE_OPENFILES, true); cluster = new MiniDFSCluster.Builder(conf).numDataNodes(3).build(); conf.set("dfs.blocksize", "1048576"); fs = cluster.getFileSystem(); @@ -198,6 +213,289 @@ public class TestOpenFilesWithSnapshot { restartNameNode(); } + private void createFile(final Path filePath) throws IOException { + DFSTestUtil.createFile(fs, filePath, (int) BUFFERLEN, + FILELEN, BLOCKSIZE, REPLICATION, SEED); + } + + private int writeToStream(final FSDataOutputStream outputStream, byte[] buf) + throws IOException { + outputStream.write(buf); + ((HdfsDataOutputStream)outputStream).hsync( + EnumSet.of(SyncFlag.UPDATE_LENGTH)); + return buf.length; + } + + /** + * Test open files under snapshot directories are getting captured + * in snapshots as a truly immutable copy. Verify open files outside + * of snapshot directory not getting affected. + * + * \- level_0_A + * \- level_1_C + * +- appA.log (open file, not under snap root) + * \- level_2_E (Snapshottable Dir) + * \- level_3_G + * +- flume.log (open file, under snap root) + * \- level_0_B + * +- appB.log (open file, not under snap root) + * \- level_2_D (Snapshottable Dir) + * +- hbase.log (open file, under snap root) + */ + @Test (timeout = 120000) + public void testPointInTimeSnapshotCopiesForOpenFiles() throws Exception { + conf.setBoolean(DFSConfigKeys.DFS_NAMENODE_SNAPSHOT_CAPTURE_OPENFILES, + true); + // Construct the directory tree + final Path level0A = new Path("/level_0_A"); + final Path level0B = new Path("/level_0_B"); + final Path level1C = new Path(level0A, "level_1_C"); + final Path level1D = new Path(level0B, "level_1_D"); + final Path level2E = new Path(level1C, "level_2_E"); + final Path level3G = new Path(level2E, "level_3_G"); + Set<Path> dirPaths = new HashSet<>(Arrays.asList(level0A, level0B, + level1C, level1D, level2E, level3G)); + for (Path dirPath : dirPaths) { + fs.mkdirs(dirPath); + } + + // String constants + final Path flumeSnapRootDir = level2E; + final Path hbaseSnapRootDir = level1D; + final String flumeFileName = "flume.log"; + final String hbaseFileName = "hbase.log"; + final String appAFileName = "appA.log"; + final String appBFileName = "appB.log"; + final String flumeSnap1Name = "flume_snap_s1"; + final String flumeSnap2Name = "flume_snap_s2"; + final String flumeSnap3Name = "flume_snap_s3"; + final String hbaseSnap1Name = "hbase_snap_s1"; + final String hbaseSnap2Name = "hbase_snap_s2"; + final String hbaseSnap3Name = "hbase_snap_s3"; + final String flumeRelPathFromSnapDir = "level_3_G/" + flumeFileName; + + // Create files and open a stream + final Path flumeFile = new Path(level3G, flumeFileName); + createFile(flumeFile); + FSDataOutputStream flumeOutputStream = fs.append(flumeFile); + + final Path hbaseFile = new Path(level1D, hbaseFileName); + createFile(hbaseFile); + FSDataOutputStream hbaseOutputStream = fs.append(hbaseFile); + + final Path appAFile = new Path(level1C, appAFileName); + createFile(appAFile); + FSDataOutputStream appAOutputStream = fs.append(appAFile); + + final Path appBFile = new Path(level0B, appBFileName); + createFile(appBFile); + FSDataOutputStream appBOutputStream = fs.append(appBFile); + + final long appAFileInitialLength = fs.getFileStatus(appAFile).getLen(); + final long appBFileInitialLength = fs.getFileStatus(appBFile).getLen(); + + // Create Snapshot S1 + final Path flumeS1Dir = SnapshotTestHelper.createSnapshot( + fs, flumeSnapRootDir, flumeSnap1Name); + final Path flumeS1Path = new Path(flumeS1Dir, flumeRelPathFromSnapDir); + final Path hbaseS1Dir = SnapshotTestHelper.createSnapshot( + fs, hbaseSnapRootDir, hbaseSnap1Name); + final Path hbaseS1Path = new Path(hbaseS1Dir, hbaseFileName); + + final long flumeFileLengthAfterS1 = fs.getFileStatus(flumeFile).getLen(); + final long hbaseFileLengthAfterS1 = fs.getFileStatus(hbaseFile).getLen(); + + // Verify if Snap S1 file lengths are same as the the live ones + Assert.assertEquals(flumeFileLengthAfterS1, + fs.getFileStatus(flumeS1Path).getLen()); + Assert.assertEquals(hbaseFileLengthAfterS1, + fs.getFileStatus(hbaseS1Path).getLen()); + Assert.assertEquals(appAFileInitialLength, + fs.getFileStatus(appAFile).getLen()); + Assert.assertEquals(appBFileInitialLength, + fs.getFileStatus(appBFile).getLen()); + + long flumeFileWrittenDataLength = flumeFileLengthAfterS1; + long hbaseFileWrittenDataLength = hbaseFileLengthAfterS1; + long appAFileWrittenDataLength = appAFileInitialLength; + + int newWriteLength = (int) (BLOCKSIZE * 1.5); + byte[] buf = new byte[newWriteLength]; + Random random = new Random(); + random.nextBytes(buf); + + // Write more data to flume and hbase files only + flumeFileWrittenDataLength += writeToStream(flumeOutputStream, buf); + hbaseFileWrittenDataLength += writeToStream(hbaseOutputStream, buf); + + // Create Snapshot S2 + final Path flumeS2Dir = SnapshotTestHelper.createSnapshot( + fs, flumeSnapRootDir, flumeSnap2Name); + final Path flumeS2Path = new Path(flumeS2Dir, flumeRelPathFromSnapDir); + final Path hbaseS2Dir = SnapshotTestHelper.createSnapshot( + fs, hbaseSnapRootDir, hbaseSnap2Name); + final Path hbaseS2Path = new Path(hbaseS2Dir, hbaseFileName); + + // Verify live files lengths are same as all data written till now + final long flumeFileLengthAfterS2 = fs.getFileStatus(flumeFile).getLen(); + final long hbaseFileLengthAfterS2 = fs.getFileStatus(hbaseFile).getLen(); + Assert.assertEquals(flumeFileWrittenDataLength, flumeFileLengthAfterS2); + Assert.assertEquals(hbaseFileWrittenDataLength, hbaseFileLengthAfterS2); + + // Verify if Snap S2 file lengths are same as the live ones + Assert.assertEquals(flumeFileLengthAfterS2, + fs.getFileStatus(flumeS2Path).getLen()); + Assert.assertEquals(hbaseFileLengthAfterS2, + fs.getFileStatus(hbaseS2Path).getLen()); + Assert.assertEquals(appAFileInitialLength, + fs.getFileStatus(appAFile).getLen()); + Assert.assertEquals(appBFileInitialLength, + fs.getFileStatus(appBFile).getLen()); + + // Write more data to appA file only + newWriteLength = (int) (BLOCKSIZE * 2.5); + buf = new byte[newWriteLength]; + random.nextBytes(buf); + appAFileWrittenDataLength += writeToStream(appAOutputStream, buf); + + // Verify other open files are not affected in their snapshots + Assert.assertEquals(flumeFileLengthAfterS2, + fs.getFileStatus(flumeS2Path).getLen()); + Assert.assertEquals(appAFileWrittenDataLength, + fs.getFileStatus(appAFile).getLen()); + + // Write more data to flume file only + newWriteLength = (int) (BLOCKSIZE * 2.5); + buf = new byte[newWriteLength]; + random.nextBytes(buf); + flumeFileWrittenDataLength += writeToStream(flumeOutputStream, buf); + + // Create Snapshot S3 + final Path flumeS3Dir = SnapshotTestHelper.createSnapshot( + fs, flumeSnapRootDir, flumeSnap3Name); + final Path flumeS3Path = new Path(flumeS3Dir, flumeRelPathFromSnapDir); + final Path hbaseS3Dir = SnapshotTestHelper.createSnapshot( + fs, hbaseSnapRootDir, hbaseSnap3Name); + final Path hbaseS3Path = new Path(hbaseS3Dir, hbaseFileName); + + // Verify live files lengths are same as all data written till now + final long flumeFileLengthAfterS3 = fs.getFileStatus(flumeFile).getLen(); + final long hbaseFileLengthAfterS3 = fs.getFileStatus(hbaseFile).getLen(); + Assert.assertEquals(flumeFileWrittenDataLength, flumeFileLengthAfterS3); + Assert.assertEquals(hbaseFileWrittenDataLength, hbaseFileLengthAfterS3); + + // Verify if Snap S3 file lengths are same as the live ones + Assert.assertEquals(flumeFileLengthAfterS3, + fs.getFileStatus(flumeS3Path).getLen()); + Assert.assertEquals(hbaseFileLengthAfterS3, + fs.getFileStatus(hbaseS3Path).getLen()); + Assert.assertEquals(appAFileWrittenDataLength, + fs.getFileStatus(appAFile).getLen()); + Assert.assertEquals(appBFileInitialLength, + fs.getFileStatus(appBFile).getLen()); + + // Verify old flume snapshots have point-in-time / frozen file lengths + // even after the live file have moved forward. + Assert.assertEquals(flumeFileLengthAfterS1, + fs.getFileStatus(flumeS1Path).getLen()); + Assert.assertEquals(flumeFileLengthAfterS2, + fs.getFileStatus(flumeS2Path).getLen()); + Assert.assertEquals(flumeFileLengthAfterS3, + fs.getFileStatus(flumeS3Path).getLen()); + + // Verify old hbase snapshots have point-in-time / frozen file lengths + // even after the live files have moved forward. + Assert.assertEquals(hbaseFileLengthAfterS1, + fs.getFileStatus(hbaseS1Path).getLen()); + Assert.assertEquals(hbaseFileLengthAfterS2, + fs.getFileStatus(hbaseS2Path).getLen()); + Assert.assertEquals(hbaseFileLengthAfterS3, + fs.getFileStatus(hbaseS3Path).getLen()); + + flumeOutputStream.close(); + hbaseOutputStream.close(); + appAOutputStream.close(); + appBOutputStream.close(); + } + + /** + * Test snapshot capturing open files and verify the same + * across NameNode restarts. + */ + @Test (timeout = 120000) + public void testSnapshotsForOpenFilesWithNNRestart() throws Exception { + // Construct the directory tree + final Path level0A = new Path("/level_0_A"); + final Path flumeSnapRootDir = level0A; + final String flumeFileName = "flume.log"; + final String flumeSnap1Name = "flume_snap_1"; + final String flumeSnap2Name = "flume_snap_2"; + + // Create files and open a stream + final Path flumeFile = new Path(level0A, flumeFileName); + createFile(flumeFile); + FSDataOutputStream flumeOutputStream = fs.append(flumeFile); + + // Create Snapshot S1 + final Path flumeS1Dir = SnapshotTestHelper.createSnapshot( + fs, flumeSnapRootDir, flumeSnap1Name); + final Path flumeS1Path = new Path(flumeS1Dir, flumeFileName); + final long flumeFileLengthAfterS1 = fs.getFileStatus(flumeFile).getLen(); + + // Verify if Snap S1 file length is same as the the live one + Assert.assertEquals(flumeFileLengthAfterS1, + fs.getFileStatus(flumeS1Path).getLen()); + + long flumeFileWrittenDataLength = flumeFileLengthAfterS1; + int newWriteLength = (int) (BLOCKSIZE * 1.5); + byte[] buf = new byte[newWriteLength]; + Random random = new Random(); + random.nextBytes(buf); + + // Write more data to flume file + flumeFileWrittenDataLength += writeToStream(flumeOutputStream, buf); + + // Create Snapshot S2 + final Path flumeS2Dir = SnapshotTestHelper.createSnapshot( + fs, flumeSnapRootDir, flumeSnap2Name); + final Path flumeS2Path = new Path(flumeS2Dir, flumeFileName); + + // Verify live files length is same as all data written till now + final long flumeFileLengthAfterS2 = fs.getFileStatus(flumeFile).getLen(); + Assert.assertEquals(flumeFileWrittenDataLength, flumeFileLengthAfterS2); + + // Verify if Snap S2 file length is same as the live one + Assert.assertEquals(flumeFileLengthAfterS2, + fs.getFileStatus(flumeS2Path).getLen()); + + // Write more data to flume file + flumeFileWrittenDataLength += writeToStream(flumeOutputStream, buf); + + // Verify old flume snapshots have point-in-time / frozen file lengths + // even after the live file have moved forward. + Assert.assertEquals(flumeFileLengthAfterS1, + fs.getFileStatus(flumeS1Path).getLen()); + Assert.assertEquals(flumeFileLengthAfterS2, + fs.getFileStatus(flumeS2Path).getLen()); + + // Restart the NameNode + restartNameNode(); + cluster.waitActive(); + + // Verify live file length hasn't changed after NN restart + Assert.assertEquals(flumeFileWrittenDataLength, + fs.getFileStatus(flumeFile).getLen()); + + // Verify old flume snapshots have point-in-time / frozen file lengths + // after NN restart and live file moved forward. + Assert.assertEquals(flumeFileLengthAfterS1, + fs.getFileStatus(flumeS1Path).getLen()); + Assert.assertEquals(flumeFileLengthAfterS2, + fs.getFileStatus(flumeS2Path).getLen()); + + flumeOutputStream.close(); + } + private void restartNameNode() throws Exception { cluster.triggerBlockReports(); NameNode nameNode = cluster.getNameNode(); http://git-wip-us.apache.org/repos/asf/hadoop/blob/20e3ae26/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/snapshot/TestSnapshotDiffReport.java ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/snapshot/TestSnapshotDiffReport.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/snapshot/TestSnapshotDiffReport.java index 453afac..b9451bc 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/snapshot/TestSnapshotDiffReport.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/snapshot/TestSnapshotDiffReport.java @@ -22,20 +22,29 @@ import static org.junit.Assert.assertTrue; import static org.junit.Assert.fail; import java.io.IOException; +import java.util.EnumSet; import java.util.HashMap; +import java.util.Random; import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FSDataOutputStream; import org.apache.hadoop.fs.Options.Rename; import org.apache.hadoop.fs.Path; +import org.apache.hadoop.hdfs.DFSConfigKeys; import org.apache.hadoop.hdfs.DFSTestUtil; import org.apache.hadoop.hdfs.DFSUtil; import org.apache.hadoop.hdfs.DistributedFileSystem; import org.apache.hadoop.hdfs.MiniDFSCluster; +import org.apache.hadoop.hdfs.client.HdfsDataOutputStream; +import org.apache.hadoop.hdfs.client.HdfsDataOutputStream.SyncFlag; import org.apache.hadoop.hdfs.protocol.SnapshotDiffReport; import org.apache.hadoop.hdfs.protocol.SnapshotDiffReport.DiffReportEntry; import org.apache.hadoop.hdfs.protocol.SnapshotDiffReport.DiffType; +import org.apache.hadoop.hdfs.server.namenode.NameNode; +import org.apache.hadoop.hdfs.server.namenode.NameNodeAdapter; import org.apache.hadoop.test.GenericTestUtils; import org.junit.After; +import org.junit.Assert; import org.junit.Before; import org.junit.Test; @@ -43,12 +52,13 @@ import org.junit.Test; * Tests snapshot deletion. */ public class TestSnapshotDiffReport { - protected static final long seed = 0; - protected static final short REPLICATION = 3; - protected static final short REPLICATION_1 = 2; - protected static final long BLOCKSIZE = 1024; - public static final int SNAPSHOTNUMBER = 10; - + private static final long SEED = 0; + private static final short REPLICATION = 3; + private static final short REPLICATION_1 = 2; + private static final long BLOCKSIZE = 1024; + private static final long BUFFERLEN = BLOCKSIZE / 2; + private static final long FILELEN = BLOCKSIZE * 2; + private final Path dir = new Path("/TestSnapshot"); private final Path sub1 = new Path(dir, "sub1"); @@ -61,6 +71,8 @@ public class TestSnapshotDiffReport { @Before public void setUp() throws Exception { conf = new Configuration(); + conf.setBoolean( + DFSConfigKeys.DFS_NAMENODE_SNAPSHOT_CAPTURE_OPENFILES, true); cluster = new MiniDFSCluster.Builder(conf).numDataNodes(REPLICATION) .format(true).build(); cluster.waitActive(); @@ -97,10 +109,10 @@ public class TestSnapshotDiffReport { Path link13 = new Path(modifyDir, "link13"); Path file14 = new Path(modifyDir, "file14"); Path file15 = new Path(modifyDir, "file15"); - DFSTestUtil.createFile(hdfs, file10, BLOCKSIZE, REPLICATION_1, seed); - DFSTestUtil.createFile(hdfs, file11, BLOCKSIZE, REPLICATION_1, seed); - DFSTestUtil.createFile(hdfs, file12, BLOCKSIZE, REPLICATION_1, seed); - DFSTestUtil.createFile(hdfs, file13, BLOCKSIZE, REPLICATION_1, seed); + DFSTestUtil.createFile(hdfs, file10, BLOCKSIZE, REPLICATION_1, SEED); + DFSTestUtil.createFile(hdfs, file11, BLOCKSIZE, REPLICATION_1, SEED); + DFSTestUtil.createFile(hdfs, file12, BLOCKSIZE, REPLICATION_1, SEED); + DFSTestUtil.createFile(hdfs, file13, BLOCKSIZE, REPLICATION_1, SEED); // create link13 hdfs.createSymlink(file13, link13, false); // create snapshot @@ -118,9 +130,9 @@ public class TestSnapshotDiffReport { // delete link13 hdfs.delete(link13, false); // create file14 - DFSTestUtil.createFile(hdfs, file14, BLOCKSIZE, REPLICATION, seed); + DFSTestUtil.createFile(hdfs, file14, BLOCKSIZE, REPLICATION, SEED); // create file15 - DFSTestUtil.createFile(hdfs, file15, BLOCKSIZE, REPLICATION, seed); + DFSTestUtil.createFile(hdfs, file15, BLOCKSIZE, REPLICATION, SEED); // create snapshot for (Path snapshotDir : snapshotDirs) { @@ -128,7 +140,7 @@ public class TestSnapshotDiffReport { } // create file11 again - DFSTestUtil.createFile(hdfs, file11, BLOCKSIZE, REPLICATION, seed); + DFSTestUtil.createFile(hdfs, file11, BLOCKSIZE, REPLICATION, SEED); // delete file12 hdfs.delete(file12, true); // modify file13 @@ -386,8 +398,8 @@ public class TestSnapshotDiffReport { final Path fileInFoo = new Path(foo, "file"); final Path bar = new Path(dir2, "bar"); final Path fileInBar = new Path(bar, "file"); - DFSTestUtil.createFile(hdfs, fileInFoo, BLOCKSIZE, REPLICATION, seed); - DFSTestUtil.createFile(hdfs, fileInBar, BLOCKSIZE, REPLICATION, seed); + DFSTestUtil.createFile(hdfs, fileInFoo, BLOCKSIZE, REPLICATION, SEED); + DFSTestUtil.createFile(hdfs, fileInBar, BLOCKSIZE, REPLICATION, SEED); // create snapshot on /dir1 SnapshotTestHelper.createSnapshot(hdfs, dir1, "s0"); @@ -421,8 +433,8 @@ public class TestSnapshotDiffReport { final Path fileInFoo = new Path(foo, "file"); final Path bar = new Path(dir2, "bar"); final Path fileInBar = new Path(bar, "file"); - DFSTestUtil.createFile(hdfs, fileInFoo, BLOCKSIZE, REPLICATION, seed); - DFSTestUtil.createFile(hdfs, fileInBar, BLOCKSIZE, REPLICATION, seed); + DFSTestUtil.createFile(hdfs, fileInFoo, BLOCKSIZE, REPLICATION, SEED); + DFSTestUtil.createFile(hdfs, fileInBar, BLOCKSIZE, REPLICATION, SEED); SnapshotTestHelper.createSnapshot(hdfs, root, "s0"); hdfs.rename(fileInFoo, fileInBar, Rename.OVERWRITE); @@ -454,7 +466,7 @@ public class TestSnapshotDiffReport { final Path root = new Path("/"); final Path foo = new Path(root, "foo"); final Path fileInFoo = new Path(foo, "file"); - DFSTestUtil.createFile(hdfs, fileInFoo, BLOCKSIZE, REPLICATION, seed); + DFSTestUtil.createFile(hdfs, fileInFoo, BLOCKSIZE, REPLICATION, SEED); SnapshotTestHelper.createSnapshot(hdfs, root, "s0"); final Path bar = new Path(root, "bar"); @@ -478,7 +490,7 @@ public class TestSnapshotDiffReport { public void testDiffReportWithRenameAndAppend() throws Exception { final Path root = new Path("/"); final Path foo = new Path(root, "foo"); - DFSTestUtil.createFile(hdfs, foo, BLOCKSIZE, REPLICATION, seed); + DFSTestUtil.createFile(hdfs, foo, BLOCKSIZE, REPLICATION, SEED); SnapshotTestHelper.createSnapshot(hdfs, root, "s0"); final Path bar = new Path(root, "bar"); @@ -504,7 +516,7 @@ public class TestSnapshotDiffReport { final Path root = new Path("/"); final Path foo = new Path(root, "foo"); final Path bar = new Path(foo, "bar"); - DFSTestUtil.createFile(hdfs, bar, BLOCKSIZE, REPLICATION, seed); + DFSTestUtil.createFile(hdfs, bar, BLOCKSIZE, REPLICATION, SEED); SnapshotTestHelper.createSnapshot(hdfs, root, "s0"); // rename /foo to /foo2 @@ -529,4 +541,140 @@ public class TestSnapshotDiffReport { new DiffReportEntry(DiffType.RENAME, DFSUtil.string2Bytes("foo2/bar"), DFSUtil.string2Bytes("foo2/bar-new"))); } + + private void createFile(final Path filePath) throws IOException { + DFSTestUtil.createFile(hdfs, filePath, (int) BUFFERLEN, + FILELEN, BLOCKSIZE, REPLICATION, SEED); + } + + private int writeToStream(final FSDataOutputStream outputStream, + byte[] buf) throws IOException { + outputStream.write(buf); + ((HdfsDataOutputStream)outputStream).hsync( + EnumSet.of(SyncFlag.UPDATE_LENGTH)); + return buf.length; + } + + private void restartNameNode() throws Exception { + cluster.triggerBlockReports(); + NameNode nameNode = cluster.getNameNode(); + NameNodeAdapter.enterSafeMode(nameNode, false); + NameNodeAdapter.saveNamespace(nameNode); + NameNodeAdapter.leaveSafeMode(nameNode); + cluster.restartNameNode(true); + } + + /** + * Test Snapshot diff report for snapshots with open files captures in them. + * Also verify if the diff report remains the same across NameNode restarts. + */ + @Test (timeout = 120000) + public void testDiffReportWithOpenFiles() throws Exception { + // Construct the directory tree + final Path level0A = new Path("/level_0_A"); + final Path flumeSnapRootDir = level0A; + final String flumeFileName = "flume.log"; + final String flumeSnap1Name = "flume_snap_1"; + final String flumeSnap2Name = "flume_snap_2"; + + // Create files and open a stream + final Path flumeFile = new Path(level0A, flumeFileName); + createFile(flumeFile); + FSDataOutputStream flumeOutputStream = hdfs.append(flumeFile); + + // Create Snapshot S1 + final Path flumeS1Dir = SnapshotTestHelper.createSnapshot( + hdfs, flumeSnapRootDir, flumeSnap1Name); + final Path flumeS1Path = new Path(flumeS1Dir, flumeFileName); + final long flumeFileLengthAfterS1 = hdfs.getFileStatus(flumeFile).getLen(); + + // Verify if Snap S1 file length is same as the the live one + Assert.assertEquals(flumeFileLengthAfterS1, + hdfs.getFileStatus(flumeS1Path).getLen()); + + verifyDiffReport(level0A, flumeSnap1Name, "", + new DiffReportEntry(DiffType.MODIFY, DFSUtil.string2Bytes(""))); + + long flumeFileWrittenDataLength = flumeFileLengthAfterS1; + int newWriteLength = (int) (BLOCKSIZE * 1.5); + byte[] buf = new byte[newWriteLength]; + Random random = new Random(); + random.nextBytes(buf); + + // Write more data to flume file + flumeFileWrittenDataLength += writeToStream(flumeOutputStream, buf); + + // Create Snapshot S2 + final Path flumeS2Dir = SnapshotTestHelper.createSnapshot( + hdfs, flumeSnapRootDir, flumeSnap2Name); + final Path flumeS2Path = new Path(flumeS2Dir, flumeFileName); + + // Verify live files length is same as all data written till now + final long flumeFileLengthAfterS2 = hdfs.getFileStatus(flumeFile).getLen(); + Assert.assertEquals(flumeFileWrittenDataLength, flumeFileLengthAfterS2); + + // Verify if Snap S2 file length is same as the live one + Assert.assertEquals(flumeFileLengthAfterS2, + hdfs.getFileStatus(flumeS2Path).getLen()); + + verifyDiffReport(level0A, flumeSnap1Name, "", + new DiffReportEntry(DiffType.MODIFY, DFSUtil.string2Bytes("")), + new DiffReportEntry(DiffType.MODIFY, + DFSUtil.string2Bytes(flumeFileName))); + + verifyDiffReport(level0A, flumeSnap2Name, "", + new DiffReportEntry(DiffType.MODIFY, DFSUtil.string2Bytes(""))); + + verifyDiffReport(level0A, flumeSnap1Name, flumeSnap2Name, + new DiffReportEntry(DiffType.MODIFY, DFSUtil.string2Bytes("")), + new DiffReportEntry(DiffType.MODIFY, + DFSUtil.string2Bytes(flumeFileName))); + + // Write more data to flume file + flumeFileWrittenDataLength += writeToStream(flumeOutputStream, buf); + + // Verify old flume snapshots have point-in-time / frozen file lengths + // even after the live file have moved forward. + Assert.assertEquals(flumeFileLengthAfterS1, + hdfs.getFileStatus(flumeS1Path).getLen()); + Assert.assertEquals(flumeFileLengthAfterS2, + hdfs.getFileStatus(flumeS2Path).getLen()); + + flumeOutputStream.close(); + + // Verify if Snap S2 file length is same as the live one + Assert.assertEquals(flumeFileWrittenDataLength, + hdfs.getFileStatus(flumeFile).getLen()); + + // Verify old flume snapshots have point-in-time / frozen file lengths + // even after the live file have moved forward. + Assert.assertEquals(flumeFileLengthAfterS1, + hdfs.getFileStatus(flumeS1Path).getLen()); + Assert.assertEquals(flumeFileLengthAfterS2, + hdfs.getFileStatus(flumeS2Path).getLen()); + + verifyDiffReport(level0A, flumeSnap1Name, "", + new DiffReportEntry(DiffType.MODIFY, DFSUtil.string2Bytes("")), + new DiffReportEntry(DiffType.MODIFY, + DFSUtil.string2Bytes(flumeFileName))); + + verifyDiffReport(level0A, flumeSnap2Name, "", + new DiffReportEntry(DiffType.MODIFY, DFSUtil.string2Bytes("")), + new DiffReportEntry(DiffType.MODIFY, + DFSUtil.string2Bytes(flumeFileName))); + + verifyDiffReport(level0A, flumeSnap1Name, flumeSnap2Name, + new DiffReportEntry(DiffType.MODIFY, DFSUtil.string2Bytes("")), + new DiffReportEntry(DiffType.MODIFY, + DFSUtil.string2Bytes(flumeFileName))); + + restartNameNode(); + + verifyDiffReport(level0A, flumeSnap1Name, flumeSnap2Name, + new DiffReportEntry(DiffType.MODIFY, DFSUtil.string2Bytes("")), + new DiffReportEntry(DiffType.MODIFY, + DFSUtil.string2Bytes(flumeFileName))); + + } + } \ No newline at end of file http://git-wip-us.apache.org/repos/asf/hadoop/blob/20e3ae26/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/snapshot/TestSnapshotManager.java ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/snapshot/TestSnapshotManager.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/snapshot/TestSnapshotManager.java index be14305..fd35388 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/snapshot/TestSnapshotManager.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/snapshot/TestSnapshotManager.java @@ -23,11 +23,13 @@ import static org.mockito.Mockito.doReturn; import static org.mockito.Mockito.mock; import static org.mockito.Mockito.spy; +import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.hdfs.protocol.SnapshotException; import org.apache.hadoop.hdfs.server.namenode.FSDirectory; import org.apache.hadoop.hdfs.server.namenode.INode; import org.apache.hadoop.hdfs.server.namenode.INodeDirectory; import org.apache.hadoop.hdfs.server.namenode.INodesInPath; +import org.apache.hadoop.hdfs.server.namenode.LeaseManager; import org.apache.hadoop.util.StringUtils; import org.junit.Assert; import org.junit.Test; @@ -46,25 +48,26 @@ public class TestSnapshotManager { public void testSnapshotLimits() throws Exception { // Setup mock objects for SnapshotManager.createSnapshot. // + LeaseManager leaseManager = mock(LeaseManager.class); INodeDirectory ids = mock(INodeDirectory.class); FSDirectory fsdir = mock(FSDirectory.class); INodesInPath iip = mock(INodesInPath.class); - SnapshotManager sm = spy(new SnapshotManager(fsdir)); + SnapshotManager sm = spy(new SnapshotManager(new Configuration(), fsdir)); doReturn(ids).when(sm).getSnapshottableRoot((INodesInPath) anyObject()); doReturn(testMaxSnapshotLimit).when(sm).getMaxSnapshotID(); // Create testMaxSnapshotLimit snapshots. These should all succeed. // for (Integer i = 0; i < testMaxSnapshotLimit; ++i) { - sm.createSnapshot(iip, "dummy", i.toString()); + sm.createSnapshot(leaseManager, iip, "dummy", i.toString()); } // Attempt to create one more snapshot. This should fail due to snapshot // ID rollover. // try { - sm.createSnapshot(iip, "dummy", "shouldFailSnapshot"); + sm.createSnapshot(leaseManager, iip, "dummy", "shouldFailSnapshot"); Assert.fail("Expected SnapshotException not thrown"); } catch (SnapshotException se) { Assert.assertTrue( @@ -79,7 +82,7 @@ public class TestSnapshotManager { // to snapshot ID rollover. // try { - sm.createSnapshot(iip, "dummy", "shouldFailSnapshot2"); + sm.createSnapshot(leaseManager, iip, "dummy", "shouldFailSnapshot2"); Assert.fail("Expected SnapshotException not thrown"); } catch (SnapshotException se) { Assert.assertTrue( --------------------------------------------------------------------- To unsubscribe, e-mail: common-commits-unsubscr...@hadoop.apache.org For additional commands, e-mail: common-commits-h...@hadoop.apache.org