HDFS-11402. HDFS Snapshots should capture point-in-time copies of OPEN files. 
(Manoj Govindassamy via Yongjun Zhang)


Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo
Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/20e3ae26
Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/20e3ae26
Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/20e3ae26

Branch: refs/heads/YARN-2915
Commit: 20e3ae260b40cd6ef657b2a629a02219d68f162f
Parents: 3721cfe
Author: Yongjun Zhang <yzh...@cloudera.com>
Authored: Fri Apr 21 20:19:20 2017 -0700
Committer: Yongjun Zhang <yzh...@cloudera.com>
Committed: Fri Apr 21 20:35:48 2017 -0700

----------------------------------------------------------------------
 .../hdfs/client/HdfsClientConfigKeys.java       |   4 +
 .../org/apache/hadoop/hdfs/DFSConfigKeys.java   |   5 +
 .../hdfs/server/namenode/FSDirSnapshotOp.java   |   7 +-
 .../hdfs/server/namenode/FSEditLogLoader.java   |   5 +-
 .../hdfs/server/namenode/FSNamesystem.java      |   2 +-
 .../hdfs/server/namenode/INodeDirectory.java    |   8 +-
 .../hdfs/server/namenode/INodesInPath.java      |  78 ++++-
 .../hdfs/server/namenode/LeaseManager.java      | 103 ++++++-
 .../snapshot/DirectorySnapshottableFeature.java |  15 +-
 .../namenode/snapshot/SnapshotManager.java      |  24 +-
 .../src/main/resources/hdfs-default.xml         |  17 ++
 .../hdfs/server/namenode/TestLeaseManager.java  | 286 +++++++++++++++++-
 .../snapshot/TestOpenFilesWithSnapshot.java     | 298 +++++++++++++++++++
 .../snapshot/TestSnapshotDiffReport.java        | 188 ++++++++++--
 .../namenode/snapshot/TestSnapshotManager.java  |  11 +-
 15 files changed, 994 insertions(+), 57 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hadoop/blob/20e3ae26/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/client/HdfsClientConfigKeys.java
----------------------------------------------------------------------
diff --git 
a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/client/HdfsClientConfigKeys.java
 
b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/client/HdfsClientConfigKeys.java
index c152a4b..fbc8d89 100644
--- 
a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/client/HdfsClientConfigKeys.java
+++ 
b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/client/HdfsClientConfigKeys.java
@@ -171,6 +171,10 @@ public interface HdfsClientConfigKeys {
       "dfs.data.transfer.client.tcpnodelay";
   boolean DFS_DATA_TRANSFER_CLIENT_TCPNODELAY_DEFAULT = true;
 
+  String DFS_NAMENODE_SNAPSHOT_CAPTURE_OPENFILES =
+      "dfs.namenode.snapshot.capture.openfiles";
+  boolean DFS_NAMENODE_SNAPSHOT_CAPTURE_OPENFILES_DEFAULT = false;
+
   /**
    * These are deprecated config keys to client code.
    */

http://git-wip-us.apache.org/repos/asf/hadoop/blob/20e3ae26/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSConfigKeys.java
----------------------------------------------------------------------
diff --git 
a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSConfigKeys.java
 
b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSConfigKeys.java
index 6ff7e5a..3fa383b 100644
--- 
a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSConfigKeys.java
+++ 
b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSConfigKeys.java
@@ -352,6 +352,11 @@ public class DFSConfigKeys extends CommonConfigurationKeys 
{
   public static final String  
DFS_NAMENODE_STARTUP_DELAY_BLOCK_DELETION_SEC_KEY = 
"dfs.namenode.startup.delay.block.deletion.sec";
   public static final long    
DFS_NAMENODE_STARTUP_DELAY_BLOCK_DELETION_SEC_DEFAULT = 0L;
 
+  public static final String DFS_NAMENODE_SNAPSHOT_CAPTURE_OPENFILES =
+      HdfsClientConfigKeys.DFS_NAMENODE_SNAPSHOT_CAPTURE_OPENFILES;
+  public static final boolean DFS_NAMENODE_SNAPSHOT_CAPTURE_OPENFILES_DEFAULT =
+      HdfsClientConfigKeys.DFS_NAMENODE_SNAPSHOT_CAPTURE_OPENFILES_DEFAULT;
+
   // Whether to enable datanode's stale state detection and usage for reads
   public static final String DFS_NAMENODE_AVOID_STALE_DATANODE_FOR_READ_KEY = 
"dfs.namenode.avoid.read.stale.datanode";
   public static final boolean 
DFS_NAMENODE_AVOID_STALE_DATANODE_FOR_READ_DEFAULT = false;

http://git-wip-us.apache.org/repos/asf/hadoop/blob/20e3ae26/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSDirSnapshotOp.java
----------------------------------------------------------------------
diff --git 
a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSDirSnapshotOp.java
 
b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSDirSnapshotOp.java
index ff076e4..9dd75bc 100644
--- 
a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSDirSnapshotOp.java
+++ 
b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSDirSnapshotOp.java
@@ -97,12 +97,13 @@ class FSDirSnapshotOp {
       throw new InvalidPathException("Invalid snapshot name: " + snapshotName);
     }
 
-    String snapshotPath = null;
+    String snapshotPath;
     verifySnapshotName(fsd, snapshotName, snapshotRoot);
     fsd.writeLock();
     try {
-      snapshotPath = snapshotManager.createSnapshot(iip, snapshotRoot,
-          snapshotName);
+      snapshotPath = snapshotManager.createSnapshot(
+          fsd.getFSNamesystem().getLeaseManager(),
+          iip, snapshotRoot, snapshotName);
     } finally {
       fsd.writeUnlock();
     }

http://git-wip-us.apache.org/repos/asf/hadoop/blob/20e3ae26/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSEditLogLoader.java
----------------------------------------------------------------------
diff --git 
a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSEditLogLoader.java
 
b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSEditLogLoader.java
index ff36f18..ae0b304 100644
--- 
a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSEditLogLoader.java
+++ 
b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSEditLogLoader.java
@@ -746,8 +746,9 @@ public class FSEditLogLoader {
           renameReservedPathsOnUpgrade(createSnapshotOp.snapshotRoot,
               logVersion);
       INodesInPath iip = fsDir.getINodesInPath(snapshotRoot, DirOp.WRITE);
-      String path = fsNamesys.getSnapshotManager().createSnapshot(iip,
-          snapshotRoot, createSnapshotOp.snapshotName);
+      String path = fsNamesys.getSnapshotManager().createSnapshot(
+          fsDir.getFSNamesystem().getLeaseManager(),
+          iip, snapshotRoot, createSnapshotOp.snapshotName);
       if (toAddRetryCache) {
         fsNamesys.addCacheEntryWithPayload(createSnapshotOp.rpcClientId,
             createSnapshotOp.rpcCallId, path);

http://git-wip-us.apache.org/repos/asf/hadoop/blob/20e3ae26/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSNamesystem.java
----------------------------------------------------------------------
diff --git 
a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSNamesystem.java
 
b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSNamesystem.java
index e24778f..3dbfdf9 100644
--- 
a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSNamesystem.java
+++ 
b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSNamesystem.java
@@ -844,7 +844,7 @@ public class FSNamesystem implements Namesystem, 
FSNamesystemMBean,
       
       this.dtSecretManager = createDelegationTokenSecretManager(conf);
       this.dir = new FSDirectory(this, conf);
-      this.snapshotManager = new SnapshotManager(dir);
+      this.snapshotManager = new SnapshotManager(conf, dir);
       this.cacheManager = new CacheManager(this, conf, blockManager);
       this.ecPolicyManager = new ErasureCodingPolicyManager(conf);
       this.topConf = new TopConf(conf);

http://git-wip-us.apache.org/repos/asf/hadoop/blob/20e3ae26/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/INodeDirectory.java
----------------------------------------------------------------------
diff --git 
a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/INodeDirectory.java
 
b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/INodeDirectory.java
index b6e2713..a4098bc 100644
--- 
a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/INodeDirectory.java
+++ 
b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/INodeDirectory.java
@@ -256,9 +256,11 @@ public class INodeDirectory extends 
INodeWithAdditionalFields
     getDirectorySnapshottableFeature().setSnapshotQuota(snapshotQuota);
   }
 
-  public Snapshot addSnapshot(int id, String name) throws SnapshotException,
-      QuotaExceededException {
-    return getDirectorySnapshottableFeature().addSnapshot(this, id, name);
+  public Snapshot addSnapshot(int id, String name,
+      final LeaseManager leaseManager, final boolean captureOpenFiles)
+      throws SnapshotException, QuotaExceededException {
+    return getDirectorySnapshottableFeature().addSnapshot(this, id, name,
+        leaseManager, captureOpenFiles);
   }
 
   public Snapshot removeSnapshot(

http://git-wip-us.apache.org/repos/asf/hadoop/blob/20e3ae26/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/INodesInPath.java
----------------------------------------------------------------------
diff --git 
a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/INodesInPath.java
 
b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/INodesInPath.java
index 1d5dbf6..abc8b63 100644
--- 
a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/INodesInPath.java
+++ 
b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/INodesInPath.java
@@ -49,24 +49,62 @@ public class INodesInPath {
         Arrays.equals(HdfsServerConstants.DOT_SNAPSHOT_DIR_BYTES, 
pathComponent);
   }
 
-  static INodesInPath fromINode(INode inode) {
+  private static INode[] getINodes(final INode inode) {
     int depth = 0, index;
     INode tmp = inode;
     while (tmp != null) {
       depth++;
       tmp = tmp.getParent();
     }
-    final byte[][] path = new byte[depth][];
-    final INode[] inodes = new INode[depth];
+    INode[] inodes = new INode[depth];
     tmp = inode;
     index = depth;
     while (tmp != null) {
       index--;
-      path[index] = tmp.getKey();
       inodes[index] = tmp;
       tmp = tmp.getParent();
     }
-    return new INodesInPath(inodes, path);
+    return inodes;
+  }
+
+  private static byte[][] getPaths(final INode[] inodes) {
+    byte[][] paths = new byte[inodes.length][];
+    for (int i = 0; i < inodes.length; i++) {
+      paths[i] = inodes[i].getKey();
+    }
+    return paths;
+  }
+
+  /**
+   * Construct {@link INodesInPath} from {@link INode}.
+   *
+   * @param inode to construct from
+   * @return INodesInPath
+   */
+  static INodesInPath fromINode(INode inode) {
+    INode[] inodes = getINodes(inode);
+    byte[][] paths = getPaths(inodes);
+    return new INodesInPath(inodes, paths);
+  }
+
+  /**
+   * Construct {@link INodesInPath} from {@link INode} and its root
+   * {@link INodeDirectory}. INodesInPath constructed this way will
+   * each have its snapshot and latest snapshot id filled in.
+   *
+   * This routine is specifically for
+   * {@link LeaseManager#getINodeWithLeases(INodeDirectory)} to get
+   * open files along with their snapshot details which is used during
+   * new snapshot creation to capture their meta data.
+   *
+   * @param rootDir the root {@link INodeDirectory} under which inode
+   *                needs to be resolved
+   * @param inode the {@link INode} to be resolved
+   * @return INodesInPath
+   */
+  static INodesInPath fromINode(final INodeDirectory rootDir, INode inode) {
+    byte[][] paths = getPaths(getINodes(inode));
+    return resolve(rootDir, paths);
   }
 
   static INodesInPath fromComponents(byte[][] components) {
@@ -382,6 +420,36 @@ public class INodesInPath {
   }
 
   /**
+   * Verify if this {@link INodesInPath} is a descendant of the
+   * requested {@link INodeDirectory}.
+   *
+   * @param inodeDirectory the ancestor directory
+   * @return true if this INodesInPath is a descendant of inodeDirectory
+   */
+  public boolean isDescendant(final INodeDirectory inodeDirectory) {
+    final INodesInPath dirIIP = fromINode(inodeDirectory);
+    return isDescendant(dirIIP);
+  }
+
+  private boolean isDescendant(final INodesInPath ancestorDirIIP) {
+    int ancestorDirINodesLength = ancestorDirIIP.length();
+    int myParentINodesLength = length() - 1;
+    if (myParentINodesLength < ancestorDirINodesLength) {
+      return false;
+    }
+
+    int index = 0;
+    while (index < ancestorDirINodesLength) {
+      if (inodes[index] != ancestorDirIIP.getINode(index)) {
+        return false;
+      }
+      index++;
+    }
+    return true;
+  }
+
+
+  /**
    * @return a new INodesInPath instance that only contains existing INodes.
    * Note that this method only handles non-snapshot paths.
    */

http://git-wip-us.apache.org/repos/asf/hadoop/blob/20e3ae26/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/LeaseManager.java
----------------------------------------------------------------------
diff --git 
a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/LeaseManager.java
 
b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/LeaseManager.java
index a9fb24b..f78eef3 100644
--- 
a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/LeaseManager.java
+++ 
b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/LeaseManager.java
@@ -28,9 +28,15 @@ import java.util.HashMap;
 import java.util.HashSet;
 import java.util.List;
 import java.util.PriorityQueue;
+import java.util.Set;
 import java.util.SortedMap;
 import java.util.TreeMap;
+import java.util.concurrent.Callable;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.Future;
 
+import com.google.common.collect.Lists;
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.classification.InterfaceAudience;
@@ -41,6 +47,7 @@ import org.apache.hadoop.util.Daemon;
 
 import com.google.common.annotations.VisibleForTesting;
 import com.google.common.base.Preconditions;
+import org.apache.hadoop.util.Time;
 
 /**
  * LeaseManager does the lease housekeeping for writing on files.   
@@ -67,16 +74,14 @@ import com.google.common.base.Preconditions;
 @InterfaceAudience.Private
 public class LeaseManager {
   public static final Log LOG = LogFactory.getLog(LeaseManager.class);
-
   private final FSNamesystem fsnamesystem;
-
   private long softLimit = HdfsConstants.LEASE_SOFTLIMIT_PERIOD;
   private long hardLimit = HdfsConstants.LEASE_HARDLIMIT_PERIOD;
+  static final int INODE_FILTER_WORKER_COUNT_MAX = 4;
+  static final int INODE_FILTER_WORKER_TASK_MIN = 512;
 
-  //
   // Used for handling lock-leases
   // Mapping: leaseHolder -> Lease
-  //
   private final SortedMap<String, Lease> leases = new TreeMap<>();
   // Set of: Lease
   private final PriorityQueue<Lease> sortedLeases = new PriorityQueue<>(512,
@@ -129,6 +134,96 @@ public class LeaseManager {
 
   Collection<Long> getINodeIdWithLeases() {return leasesById.keySet();}
 
+  /**
+   * Get {@link INodesInPath} for all {@link INode} in the system
+   * which has a valid lease.
+   *
+   * @return Set<INodesInPath>
+   */
+  public Set<INodesInPath> getINodeWithLeases() {
+    return getINodeWithLeases(null);
+  }
+
+  private synchronized INode[] getINodesWithLease() {
+    int inodeCount = 0;
+    INode[] inodes = new INode[leasesById.size()];
+    for (long inodeId : leasesById.keySet()) {
+      inodes[inodeCount] = fsnamesystem.getFSDirectory().getInode(inodeId);
+      inodeCount++;
+    }
+    return inodes;
+  }
+
+  /**
+   * Get {@link INodesInPath} for all files under the ancestor directory which
+   * has valid lease. If the ancestor directory is null, then return all files
+   * in the system with valid lease. Callers must hold {@link FSNamesystem}
+   * read or write lock.
+   *
+   * @param ancestorDir the ancestor {@link INodeDirectory}
+   * @return Set<INodesInPath>
+   */
+  public Set<INodesInPath> getINodeWithLeases(final INodeDirectory
+      ancestorDir) {
+    assert fsnamesystem.hasReadLock();
+    final long startTimeMs = Time.monotonicNow();
+    Set<INodesInPath> iipSet = new HashSet<>();
+    final INode[] inodes = getINodesWithLease();
+    int inodeCount = inodes.length;
+    if (inodeCount == 0) {
+      return iipSet;
+    }
+
+    List<Future<List<INodesInPath>>> futureList = Lists.newArrayList();
+    final int workerCount = Math.min(INODE_FILTER_WORKER_COUNT_MAX,
+        (((inodeCount - 1) / INODE_FILTER_WORKER_TASK_MIN) + 1));
+    ExecutorService inodeFilterService =
+        Executors.newFixedThreadPool(workerCount);
+    for (int workerIdx = 0; workerIdx < workerCount; workerIdx++) {
+      final int startIdx = workerIdx;
+      Callable<List<INodesInPath>> c = new Callable<List<INodesInPath>>() {
+        @Override
+        public List<INodesInPath> call() {
+          List<INodesInPath> iNodesInPaths = Lists.newArrayList();
+          for (int idx = startIdx; idx < inodeCount; idx += workerCount) {
+            INode inode = inodes[idx];
+            if (!inode.isFile()) {
+              continue;
+            }
+            INodesInPath inodesInPath = INodesInPath.fromINode(
+                fsnamesystem.getFSDirectory().getRoot(), inode.asFile());
+            if (ancestorDir != null &&
+                !inodesInPath.isDescendant(ancestorDir)) {
+              continue;
+            }
+            iNodesInPaths.add(inodesInPath);
+          }
+          return iNodesInPaths;
+        }
+      };
+
+      // Submit the inode filter task to the Executor Service
+      futureList.add(inodeFilterService.submit(c));
+    }
+    inodeFilterService.shutdown();
+
+    for (Future<List<INodesInPath>> f : futureList) {
+      try {
+        iipSet.addAll(f.get());
+      } catch (Exception e) {
+        LOG.warn("INode filter task encountered exception: ", e);
+      }
+    }
+    final long endTimeMs = Time.monotonicNow();
+    if ((endTimeMs - startTimeMs) > 1000) {
+      LOG.info("Took " + (endTimeMs - startTimeMs) + " ms to collect "
+          + iipSet.size() + " open files with leases" +
+          ((ancestorDir != null) ?
+              " under " + ancestorDir.getFullPathName() : "."));
+    }
+    return iipSet;
+  }
+
   /** @return the lease containing src */
   public synchronized Lease getLease(INodeFile src) {return 
leasesById.get(src.getId());}
 

http://git-wip-us.apache.org/repos/asf/hadoop/blob/20e3ae26/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/snapshot/DirectorySnapshottableFeature.java
----------------------------------------------------------------------
diff --git 
a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/snapshot/DirectorySnapshottableFeature.java
 
b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/snapshot/DirectorySnapshottableFeature.java
index fa7bace..3039ad3 100644
--- 
a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/snapshot/DirectorySnapshottableFeature.java
+++ 
b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/snapshot/DirectorySnapshottableFeature.java
@@ -23,6 +23,7 @@ import java.util.Collections;
 import java.util.Iterator;
 import java.util.LinkedList;
 import java.util.List;
+import java.util.Set;
 
 import org.apache.hadoop.HadoopIllegalArgumentException;
 import org.apache.hadoop.classification.InterfaceAudience;
@@ -39,6 +40,8 @@ import org.apache.hadoop.hdfs.server.namenode.INodeFile;
 import org.apache.hadoop.hdfs.server.namenode.INodeReference;
 import org.apache.hadoop.hdfs.server.namenode.INodeReference.WithCount;
 import org.apache.hadoop.hdfs.server.namenode.INodeReference.WithName;
+import org.apache.hadoop.hdfs.server.namenode.INodesInPath;
+import org.apache.hadoop.hdfs.server.namenode.LeaseManager;
 import org.apache.hadoop.hdfs.util.Diff.ListType;
 import org.apache.hadoop.hdfs.util.ReadOnlyList;
 import org.apache.hadoop.util.Time;
@@ -163,7 +166,8 @@ public class DirectorySnapshottableFeature extends 
DirectoryWithSnapshotFeature
   }
 
   /** Add a snapshot. */
-  public Snapshot addSnapshot(INodeDirectory snapshotRoot, int id, String name)
+  public Snapshot addSnapshot(INodeDirectory snapshotRoot, int id, String name,
+      final LeaseManager leaseManager, final boolean captureOpenFiles)
       throws SnapshotException, QuotaExceededException {
     //check snapshot quota
     final int n = getNumSnapshots();
@@ -188,6 +192,15 @@ public class DirectorySnapshottableFeature extends 
DirectoryWithSnapshotFeature
     final long now = Time.now();
     snapshotRoot.updateModificationTime(now, Snapshot.CURRENT_STATE_ID);
     s.getRoot().setModificationTime(now, Snapshot.CURRENT_STATE_ID);
+
+    if (captureOpenFiles) {
+      Set<INodesInPath> openFilesIIP =
+          leaseManager.getINodeWithLeases(snapshotRoot);
+      for (INodesInPath openFileIIP : openFilesIIP)  {
+        INodeFile openFile = openFileIIP.getLastINode().asFile();
+        openFile.recordModification(openFileIIP.getLatestSnapshotId());
+      }
+    }
     return s;
   }
 

http://git-wip-us.apache.org/repos/asf/hadoop/blob/20e3ae26/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/snapshot/SnapshotManager.java
----------------------------------------------------------------------
diff --git 
a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/snapshot/SnapshotManager.java
 
b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/snapshot/SnapshotManager.java
index 8ad7824..ffc203f 100644
--- 
a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/snapshot/SnapshotManager.java
+++ 
b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/snapshot/SnapshotManager.java
@@ -17,6 +17,9 @@
  */
 package org.apache.hadoop.hdfs.server.namenode.snapshot;
 
+import static 
org.apache.hadoop.hdfs.DFSConfigKeys.DFS_NAMENODE_SNAPSHOT_CAPTURE_OPENFILES;
+import static 
org.apache.hadoop.hdfs.DFSConfigKeys.DFS_NAMENODE_SNAPSHOT_CAPTURE_OPENFILES_DEFAULT;
+
 import java.io.DataInput;
 import java.io.DataOutput;
 import java.io.IOException;
@@ -29,6 +32,7 @@ import java.util.concurrent.atomic.AtomicInteger;
 
 import javax.management.ObjectName;
 
+import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.hdfs.DFSUtil;
 import org.apache.hadoop.hdfs.DFSUtilClient;
 import org.apache.hadoop.hdfs.protocol.SnapshotDiffReport;
@@ -43,6 +47,7 @@ import org.apache.hadoop.hdfs.server.namenode.FSNamesystem;
 import org.apache.hadoop.hdfs.server.namenode.INode;
 import org.apache.hadoop.hdfs.server.namenode.INodeDirectory;
 import org.apache.hadoop.hdfs.server.namenode.INodesInPath;
+import org.apache.hadoop.hdfs.server.namenode.LeaseManager;
 import org.apache.hadoop.metrics2.util.MBeans;
 
 import com.google.common.base.Preconditions;
@@ -60,20 +65,23 @@ import com.google.common.base.Preconditions;
  * if necessary.
  */
 public class SnapshotManager implements SnapshotStatsMXBean {
-  private boolean allowNestedSnapshots = false;
   private final FSDirectory fsdir;
-  private static final int SNAPSHOT_ID_BIT_WIDTH = 24;
-
+  private final boolean captureOpenFiles;
   private final AtomicInteger numSnapshots = new AtomicInteger();
+  private static final int SNAPSHOT_ID_BIT_WIDTH = 24;
 
+  private boolean allowNestedSnapshots = false;
   private int snapshotCounter = 0;
   
   /** All snapshottable directories in the namesystem. */
   private final Map<Long, INodeDirectory> snapshottables =
       new HashMap<Long, INodeDirectory>();
 
-  public SnapshotManager(final FSDirectory fsdir) {
+  public SnapshotManager(final Configuration conf, final FSDirectory fsdir) {
     this.fsdir = fsdir;
+    this.captureOpenFiles = conf.getBoolean(
+        DFS_NAMENODE_SNAPSHOT_CAPTURE_OPENFILES,
+        DFS_NAMENODE_SNAPSHOT_CAPTURE_OPENFILES_DEFAULT);
   }
 
   /** Used in tests only */
@@ -203,8 +211,9 @@ public class SnapshotManager implements SnapshotStatsMXBean 
{
    *           snapshot with the given name for the directory, and/or 3)
    *           snapshot number exceeds quota
    */
-  public String createSnapshot(final INodesInPath iip, String snapshotRoot,
-      String snapshotName) throws IOException {
+  public String createSnapshot(final LeaseManager leaseManager,
+      final INodesInPath iip, String snapshotRoot, String snapshotName)
+      throws IOException {
     INodeDirectory srcRoot = getSnapshottableRoot(iip);
 
     if (snapshotCounter == getMaxSnapshotID()) {
@@ -216,7 +225,8 @@ public class SnapshotManager implements SnapshotStatsMXBean 
{
           "snapshot IDs and ID rollover is not supported.");
     }
 
-    srcRoot.addSnapshot(snapshotCounter, snapshotName);
+    srcRoot.addSnapshot(snapshotCounter, snapshotName, leaseManager,
+        this.captureOpenFiles);
       
     //create success, update id
     snapshotCounter++;

http://git-wip-us.apache.org/repos/asf/hadoop/blob/20e3ae26/hadoop-hdfs-project/hadoop-hdfs/src/main/resources/hdfs-default.xml
----------------------------------------------------------------------
diff --git 
a/hadoop-hdfs-project/hadoop-hdfs/src/main/resources/hdfs-default.xml 
b/hadoop-hdfs-project/hadoop-hdfs/src/main/resources/hdfs-default.xml
index 5365cac..7fcea01 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/resources/hdfs-default.xml
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/resources/hdfs-default.xml
@@ -4154,6 +4154,23 @@
   </description>
 </property>
 
+  <property>
+    <name>dfs.namenode.snapshot.capture.openfiles</name>
+    <value>false</value>
+    <description>
+      If true, snapshots taken will have an immutable shared copy of
+      the open files that have valid leases. Even after the open files
+      grow or shrink in size, snapshot will always have the previous
+      point-in-time version of the open files, just like all other
+      closed files. Default is false.
+      Note: The file length captured for open files in snapshot is
+      whats recorded in NameNode at the time of snapshot and it may
+      be shorter than what the client has written till then. In order
+      to capture the latest length, the client can call hflush/hsync
+      with the flag SyncFlag.UPDATE_LENGTH on the open files handles.
+    </description>
+  </property>
+
 <property>
   <name>dfs.pipeline.ecn</name>
   <value>false</value>

http://git-wip-us.apache.org/repos/asf/hadoop/blob/20e3ae26/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestLeaseManager.java
----------------------------------------------------------------------
diff --git 
a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestLeaseManager.java
 
b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestLeaseManager.java
index 6692090..9adb071 100644
--- 
a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestLeaseManager.java
+++ 
b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestLeaseManager.java
@@ -20,24 +20,33 @@ package org.apache.hadoop.hdfs.server.namenode;
 import static org.hamcrest.CoreMatchers.is;
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertTrue;
-import static org.junit.Assert.assertNotNull;
-import static org.junit.Assert.assertNull;
 
 import com.google.common.collect.Lists;
 import org.apache.hadoop.fs.FSDataOutputStream;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.fs.permission.FsPermission;
 import org.apache.hadoop.fs.permission.PermissionStatus;
+import org.apache.hadoop.hdfs.DFSUtil;
 import org.apache.hadoop.hdfs.DistributedFileSystem;
 import org.apache.hadoop.hdfs.HdfsConfiguration;
 import org.apache.hadoop.hdfs.MiniDFSCluster;
+import org.apache.hadoop.hdfs.protocol.HdfsConstants;
 import org.apache.hadoop.hdfs.protocol.HdfsConstants.SafeModeAction;
+import org.apache.hadoop.hdfs.protocol.QuotaExceededException;
 import org.apache.hadoop.hdfs.server.blockmanagement.BlockInfo;
+import org.apache.hadoop.hdfs.server.namenode.snapshot.Snapshot;
 import org.junit.Rule;
 import org.junit.Test;
 import org.junit.rules.Timeout;
 
 import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Map;
+import java.util.Map.Entry;
+import java.util.Set;
+import java.util.concurrent.atomic.AtomicInteger;
 
 import static org.junit.Assert.assertThat;
 import static org.mockito.Mockito.*;
@@ -69,19 +78,21 @@ public class TestLeaseManager {
   /** Check that LeaseManager.checkLease release some leases
    */
   @Test
-  public void testCheckLease() {
+  public void testCheckLease() throws InterruptedException {
     LeaseManager lm = new LeaseManager(makeMockFsNameSystem());
-
-    long numLease = 100;
+    final long numLease = 100;
+    final long expiryTime = 0;
+    final long waitTime = expiryTime + 1;
 
     //Make sure the leases we are going to add exceed the hard limit
-    lm.setLeasePeriod(0, 0);
+    lm.setLeasePeriod(expiryTime, expiryTime);
 
     for (long i = 0; i <= numLease - 1; i++) {
       //Add some leases to the LeaseManager
       lm.addLease("holder"+i, INodeId.ROOT_INODE_ID + i);
     }
     assertEquals(numLease, lm.countLease());
+    Thread.sleep(waitTime);
 
     //Initiate a call to checkLease. This should exit within the test timeout
     lm.checkLeases();
@@ -156,10 +167,271 @@ public class TestLeaseManager {
     }
   }
 
+  /**
+   * Test leased files counts from
+   * {@link LeaseManager#getINodeWithLeases()},
+   * {@link LeaseManager#getINodeIdWithLeases()} and
+   * {@link LeaseManager#getINodeWithLeases(INodeDirectory)}.
+   */
+  @Test (timeout = 60000)
+  public void testInodeWithLeases() throws Exception {
+    FSNamesystem fsNamesystem = makeMockFsNameSystem();
+    FSDirectory fsDirectory = fsNamesystem.getFSDirectory();
+    LeaseManager lm = new LeaseManager(fsNamesystem);
+    Set<Long> iNodeIds = new HashSet<>(Arrays.asList(
+        INodeId.ROOT_INODE_ID + 1,
+        INodeId.ROOT_INODE_ID + 2,
+        INodeId.ROOT_INODE_ID + 3,
+        INodeId.ROOT_INODE_ID + 4
+        ));
+    final PermissionStatus perm = PermissionStatus.createImmutable(
+        "user", "group", FsPermission.createImmutable((short)0755));
+    INodeDirectory rootInodeDirectory = new INodeDirectory(
+        HdfsConstants.GRANDFATHER_INODE_ID, DFSUtil.string2Bytes(""),
+        perm, 0L);
+    when(fsDirectory.getRoot()).thenReturn(rootInodeDirectory);
+    verifyINodeLeaseCounts(lm, rootInodeDirectory, 0, 0, 0);
+
+    for (Long iNodeId : iNodeIds) {
+      INodeFile iNodeFile = stubInodeFile(iNodeId);
+      iNodeFile.setParent(rootInodeDirectory);
+      when(fsDirectory.getInode(iNodeId)).thenReturn(iNodeFile);
+      lm.addLease("holder_" + iNodeId, iNodeId);
+    }
+    verifyINodeLeaseCounts(lm, rootInodeDirectory, iNodeIds.size(),
+        iNodeIds.size(), iNodeIds.size());
+
+    for (Long iNodeId : iNodeIds) {
+      lm.removeLease(iNodeId);
+    }
+    verifyINodeLeaseCounts(lm, rootInodeDirectory, 0, 0, 0);
+  }
+
+  /**
+   * Test leased files counts at various scale from
+   * {@link LeaseManager#getINodeWithLeases()},
+   * {@link LeaseManager#getINodeIdWithLeases()} and
+   * {@link LeaseManager#getINodeWithLeases(INodeDirectory)}.
+   */
+  @Test (timeout = 240000)
+  public void testInodeWithLeasesAtScale() throws Exception {
+    FSNamesystem fsNamesystem = makeMockFsNameSystem();
+    FSDirectory fsDirectory = fsNamesystem.getFSDirectory();
+    LeaseManager lm = new LeaseManager(fsNamesystem);
+
+    final PermissionStatus perm = PermissionStatus.createImmutable(
+        "user", "group", FsPermission.createImmutable((short)0755));
+    INodeDirectory rootInodeDirectory = new INodeDirectory(
+        HdfsConstants.GRANDFATHER_INODE_ID, DFSUtil.string2Bytes(""),
+        perm, 0L);
+    when(fsDirectory.getRoot()).thenReturn(rootInodeDirectory);
+
+    // Case 1: No open files
+    int scale = 0;
+    testInodeWithLeasesAtScaleImpl(lm, fsDirectory, rootInodeDirectory, scale);
+
+    for (int workerCount = 1;
+         workerCount <= LeaseManager.INODE_FILTER_WORKER_COUNT_MAX / 2;
+         workerCount++) {
+      // Case 2: Open files count is half of worker task size
+      scale = workerCount * LeaseManager.INODE_FILTER_WORKER_TASK_MIN / 2;
+      testInodeWithLeasesAtScaleImpl(lm, fsDirectory,
+          rootInodeDirectory, scale);
+
+      // Case 3: Open files count is 1 less of worker task size
+      scale = workerCount * LeaseManager.INODE_FILTER_WORKER_TASK_MIN - 1;
+      testInodeWithLeasesAtScaleImpl(lm, fsDirectory,
+          rootInodeDirectory, scale);
+
+      // Case 4: Open files count is equal to worker task size
+      scale = workerCount * LeaseManager.INODE_FILTER_WORKER_TASK_MIN;
+      testInodeWithLeasesAtScaleImpl(lm, fsDirectory,
+          rootInodeDirectory, scale);
+
+      // Case 5: Open files count is 1 more than worker task size
+      scale = workerCount * LeaseManager.INODE_FILTER_WORKER_TASK_MIN + 1;
+      testInodeWithLeasesAtScaleImpl(lm, fsDirectory,
+          rootInodeDirectory, scale);
+    }
+
+    // Case 6: Open files count is way more than worker count
+    scale = 1279;
+    testInodeWithLeasesAtScaleImpl(lm, fsDirectory, rootInodeDirectory, scale);
+  }
+
+  private void testInodeWithLeasesAtScaleImpl(final LeaseManager leaseManager,
+      final FSDirectory fsDirectory, INodeDirectory ancestorDirectory,
+      int scale) {
+    verifyINodeLeaseCounts(leaseManager, ancestorDirectory, 0, 0, 0);
+
+    Set<Long> iNodeIds = new HashSet<>();
+    for (int i = 0; i < scale; i++) {
+      iNodeIds.add(INodeId.ROOT_INODE_ID + i);
+    }
+    for (Long iNodeId : iNodeIds) {
+      INodeFile iNodeFile = stubInodeFile(iNodeId);
+      iNodeFile.setParent(ancestorDirectory);
+      when(fsDirectory.getInode(iNodeId)).thenReturn(iNodeFile);
+      leaseManager.addLease("holder_" + iNodeId, iNodeId);
+    }
+    verifyINodeLeaseCounts(leaseManager, ancestorDirectory, iNodeIds.size(),
+        iNodeIds.size(), iNodeIds.size());
+
+    leaseManager.removeAllLeases();
+    verifyINodeLeaseCounts(leaseManager, ancestorDirectory, 0, 0, 0);
+  }
+
+  /**
+   * Verify leased INode details across lease get and release from
+   * {@link LeaseManager#getINodeIdWithLeases()} and
+   * {@link LeaseManager#getINodeWithLeases(INodeDirectory)}.
+   */
+  @Test (timeout = 60000)
+  public void testInodeWithLeasesForAncestorDir() throws Exception {
+    FSNamesystem fsNamesystem = makeMockFsNameSystem();
+    FSDirectory fsDirectory = fsNamesystem.getFSDirectory();
+    LeaseManager lm = new LeaseManager(fsNamesystem);
+
+    final PermissionStatus perm = PermissionStatus.createImmutable(
+        "user", "group", FsPermission.createImmutable((short)0755));
+    INodeDirectory rootInodeDirectory = new INodeDirectory(
+        HdfsConstants.GRANDFATHER_INODE_ID, DFSUtil.string2Bytes(""),
+        perm, 0L);
+    when(fsDirectory.getRoot()).thenReturn(rootInodeDirectory);
+
+    AtomicInteger inodeIds = new AtomicInteger(
+        (int) (HdfsConstants.GRANDFATHER_INODE_ID + 1234));
+    String[] pathTree = new String[] {
+        "/root.log",
+        "/ENG/a/a1.log",
+        "/ENG/a/b/b1.log",
+        "/ENG/a/b/c/c1.log",
+        "/ENG/a/b/c/c2.log",
+        "/OPS/m/m1.log",
+        "/OPS/m/n/n1.log",
+        "/OPS/m/n/n2.log"
+    };
+    Map<String, INode> pathINodeMap = createINodeTree(rootInodeDirectory,
+        pathTree, inodeIds);
+
+    assertEquals(0, lm.getINodeIdWithLeases().size());
+    for (Entry<String, INode> entry : pathINodeMap.entrySet()) {
+      long iNodeId = entry.getValue().getId();
+      when(fsDirectory.getInode(iNodeId)).thenReturn(entry.getValue());
+      if (entry.getKey().contains("log")) {
+        lm.addLease("holder_" + iNodeId, iNodeId);
+      }
+    }
+    assertEquals(pathTree.length, lm.getINodeIdWithLeases().size());
+    assertEquals(pathTree.length, lm.getINodeWithLeases().size());
+    assertEquals(pathTree.length, lm.getINodeWithLeases(
+        rootInodeDirectory).size());
+
+    // reset
+    lm.removeAllLeases();
+
+    Set<String> filesLeased = new HashSet<>(
+        Arrays.asList("root.log", "a1.log", "c1.log", "n2.log"));
+    for (String fileName : filesLeased) {
+      lm.addLease("holder", pathINodeMap.get(fileName).getId());
+    }
+    assertEquals(filesLeased.size(), lm.getINodeIdWithLeases().size());
+    assertEquals(filesLeased.size(), lm.getINodeWithLeases().size());
+    Set<INodesInPath> iNodeWithLeases = lm.getINodeWithLeases();
+    for (INodesInPath iNodesInPath : iNodeWithLeases) {
+      String leasedFileName = DFSUtil.bytes2String(
+          iNodesInPath.getLastLocalName());
+      assertTrue(filesLeased.contains(leasedFileName));
+    }
+
+    assertEquals(filesLeased.size(),
+        lm.getINodeWithLeases(rootInodeDirectory).size());
+    assertEquals(filesLeased.size() - 2,
+        lm.getINodeWithLeases(pathINodeMap.get("ENG").asDirectory()).size());
+    assertEquals(filesLeased.size() - 2,
+        lm.getINodeWithLeases(pathINodeMap.get("a").asDirectory()).size());
+    assertEquals(filesLeased.size() - 3,
+        lm.getINodeWithLeases(pathINodeMap.get("c").asDirectory()).size());
+    assertEquals(filesLeased.size() - 3,
+        lm.getINodeWithLeases(pathINodeMap.get("OPS").asDirectory()).size());
+    assertEquals(filesLeased.size() - 3,
+        lm.getINodeWithLeases(pathINodeMap.get("n").asDirectory()).size());
+
+    lm.removeLease(pathINodeMap.get("n2.log").getId());
+    assertEquals(filesLeased.size() - 1,
+        lm.getINodeWithLeases(rootInodeDirectory).size());
+    assertEquals(filesLeased.size() - 4,
+        lm.getINodeWithLeases(pathINodeMap.get("n").asDirectory()).size());
+
+    lm.removeAllLeases();
+    filesLeased.clear();
+    assertEquals(filesLeased.size(),
+        lm.getINodeWithLeases(rootInodeDirectory).size());
+
+  }
+
+  private void verifyINodeLeaseCounts(final LeaseManager leaseManager,
+      INodeDirectory ancestorDirectory, int iNodeIdWithLeaseCount,
+      int iNodeWithLeaseCount, int iNodeUnderAncestorLeaseCount) {
+    assertEquals(iNodeIdWithLeaseCount,
+        leaseManager.getINodeIdWithLeases().size());
+    assertEquals(iNodeWithLeaseCount,
+        leaseManager.getINodeWithLeases().size());
+    assertEquals(iNodeUnderAncestorLeaseCount,
+        leaseManager.getINodeWithLeases(ancestorDirectory).size());
+  }
+
+  private Map<String, INode> createINodeTree(INodeDirectory parentDir,
+      String[] pathTree, AtomicInteger inodeId)
+      throws QuotaExceededException {
+    HashMap<String, INode> pathINodeMap = new HashMap<>();
+    for (String path : pathTree) {
+      byte[][] components = INode.getPathComponents(path);
+      FsPermission perm = FsPermission.createImmutable((short) 0755);
+      PermissionStatus permStatus =
+          PermissionStatus.createImmutable("", "", perm);
+
+      INodeDirectory prev = parentDir;
+      INodeDirectory dir = null;
+      for (int i = 0; i < components.length - 1; i++) {
+        byte[] component = components[i];
+        if (component.length == 0) {
+          continue;
+        }
+        INode existingChild = prev.getChild(
+            component, Snapshot.CURRENT_STATE_ID);
+        if (existingChild == null) {
+          String dirName = DFSUtil.bytes2String(component);
+          dir = new INodeDirectory(inodeId.incrementAndGet(), component,
+              permStatus, 0);
+          prev.addChild(dir, false, Snapshot.CURRENT_STATE_ID);
+          pathINodeMap.put(dirName, dir);
+          prev = dir;
+        } else {
+          assertTrue(existingChild.isDirectory());
+          prev = existingChild.asDirectory();
+        }
+      }
+
+      PermissionStatus p = new PermissionStatus(
+          "user", "group", new FsPermission((short) 0777));
+      byte[] fileNameBytes = components[components.length - 1];
+      String fileName = DFSUtil.bytes2String(fileNameBytes);
+      INodeFile iNodeFile = new INodeFile(
+          inodeId.incrementAndGet(), fileNameBytes,
+          p, 0L, 0L, BlockInfo.EMPTY_ARRAY, (short) 1, 1L);
+      iNodeFile.setParent(prev);
+      pathINodeMap.put(fileName, iNodeFile);
+    }
+    return pathINodeMap;
+  }
+
+
   private static FSNamesystem makeMockFsNameSystem() {
     FSDirectory dir = mock(FSDirectory.class);
     FSNamesystem fsn = mock(FSNamesystem.class);
     when(fsn.isRunning()).thenReturn(true);
+    when(fsn.hasReadLock()).thenReturn(true);
     when(fsn.hasWriteLock()).thenReturn(true);
     when(fsn.getFSDirectory()).thenReturn(dir);
     
when(fsn.getMaxLockHoldToReleaseLeaseMs()).thenReturn(maxLockHoldToReleaseLeaseMs);
@@ -170,7 +442,7 @@ public class TestLeaseManager {
     PermissionStatus p = new PermissionStatus(
         "dummy", "dummy", new FsPermission((short) 0777));
     return new INodeFile(
-        inodeId, "/foo".getBytes(), p, 0L, 0L,
+        inodeId, new String("foo-" + inodeId).getBytes(), p, 0L, 0L,
         BlockInfo.EMPTY_ARRAY, (short) 1, 1L);
   }
 }

http://git-wip-us.apache.org/repos/asf/hadoop/blob/20e3ae26/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/snapshot/TestOpenFilesWithSnapshot.java
----------------------------------------------------------------------
diff --git 
a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/snapshot/TestOpenFilesWithSnapshot.java
 
b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/snapshot/TestOpenFilesWithSnapshot.java
index 7b7c34a..7aaadf8 100644
--- 
a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/snapshot/TestOpenFilesWithSnapshot.java
+++ 
b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/snapshot/TestOpenFilesWithSnapshot.java
@@ -18,21 +18,28 @@
 package org.apache.hadoop.hdfs.server.namenode.snapshot;
 
 import java.io.IOException;
+import java.util.Arrays;
 import java.util.EnumSet;
+import java.util.HashSet;
+import java.util.Random;
+import java.util.Set;
 
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.FSDataOutputStream;
 import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hdfs.DFSConfigKeys;
 import org.apache.hadoop.hdfs.DFSOutputStream;
 import org.apache.hadoop.hdfs.DFSTestUtil;
 import org.apache.hadoop.hdfs.DistributedFileSystem;
 import org.apache.hadoop.hdfs.MiniDFSCluster;
+import org.apache.hadoop.hdfs.client.HdfsDataOutputStream;
 import org.apache.hadoop.hdfs.client.HdfsDataOutputStream.SyncFlag;
 import org.apache.hadoop.hdfs.protocol.HdfsConstants;
 import org.apache.hadoop.hdfs.server.namenode.NameNode;
 import org.apache.hadoop.hdfs.server.namenode.NameNodeAdapter;
 import org.apache.hadoop.hdfs.server.protocol.NamenodeProtocols;
 import org.junit.After;
+import org.junit.Assert;
 import org.junit.Before;
 import org.junit.Test;
 
@@ -41,8 +48,16 @@ public class TestOpenFilesWithSnapshot {
   MiniDFSCluster cluster = null;
   DistributedFileSystem fs = null;
 
+  private static final long SEED = 0;
+  private static final short REPLICATION = 3;
+  private static final long BLOCKSIZE = 1024;
+  private static final long BUFFERLEN = BLOCKSIZE / 2;
+  private static final long FILELEN = BLOCKSIZE * 2;
+
   @Before
   public void setup() throws IOException {
+    conf.setBoolean(
+        DFSConfigKeys.DFS_NAMENODE_SNAPSHOT_CAPTURE_OPENFILES, true);
     cluster = new MiniDFSCluster.Builder(conf).numDataNodes(3).build();
     conf.set("dfs.blocksize", "1048576");
     fs = cluster.getFileSystem();
@@ -198,6 +213,289 @@ public class TestOpenFilesWithSnapshot {
     restartNameNode();
   }
 
+  private void createFile(final Path filePath) throws IOException {
+    DFSTestUtil.createFile(fs, filePath, (int) BUFFERLEN,
+        FILELEN, BLOCKSIZE, REPLICATION, SEED);
+  }
+
+  private int writeToStream(final FSDataOutputStream outputStream, byte[] buf)
+      throws IOException {
+    outputStream.write(buf);
+    ((HdfsDataOutputStream)outputStream).hsync(
+        EnumSet.of(SyncFlag.UPDATE_LENGTH));
+    return buf.length;
+  }
+
+  /**
+   * Test open files under snapshot directories are getting captured
+   * in snapshots as a truly immutable copy. Verify open files outside
+   * of snapshot directory not getting affected.
+   *
+   * \- level_0_A
+   *   \- level_1_C
+   *     +- appA.log         (open file, not under snap root)
+   *     \- level_2_E        (Snapshottable Dir)
+   *       \- level_3_G
+   *         +- flume.log    (open file, under snap root)
+   * \- level_0_B
+   *   +- appB.log         (open file, not under snap root)
+   *   \- level_2_D        (Snapshottable Dir)
+   *     +- hbase.log      (open file, under snap root)
+   */
+  @Test (timeout = 120000)
+  public void testPointInTimeSnapshotCopiesForOpenFiles() throws Exception {
+    conf.setBoolean(DFSConfigKeys.DFS_NAMENODE_SNAPSHOT_CAPTURE_OPENFILES,
+        true);
+    // Construct the directory tree
+    final Path level0A = new Path("/level_0_A");
+    final Path level0B = new Path("/level_0_B");
+    final Path level1C = new Path(level0A, "level_1_C");
+    final Path level1D = new Path(level0B, "level_1_D");
+    final Path level2E = new Path(level1C, "level_2_E");
+    final Path level3G = new Path(level2E, "level_3_G");
+    Set<Path> dirPaths = new HashSet<>(Arrays.asList(level0A, level0B,
+        level1C, level1D, level2E, level3G));
+    for (Path dirPath : dirPaths) {
+      fs.mkdirs(dirPath);
+    }
+
+    // String constants
+    final Path flumeSnapRootDir = level2E;
+    final Path hbaseSnapRootDir = level1D;
+    final String flumeFileName = "flume.log";
+    final String hbaseFileName = "hbase.log";
+    final String appAFileName = "appA.log";
+    final String appBFileName = "appB.log";
+    final String flumeSnap1Name = "flume_snap_s1";
+    final String flumeSnap2Name = "flume_snap_s2";
+    final String flumeSnap3Name = "flume_snap_s3";
+    final String hbaseSnap1Name = "hbase_snap_s1";
+    final String hbaseSnap2Name = "hbase_snap_s2";
+    final String hbaseSnap3Name = "hbase_snap_s3";
+    final String flumeRelPathFromSnapDir = "level_3_G/" + flumeFileName;
+
+    // Create files and open a stream
+    final Path flumeFile = new Path(level3G, flumeFileName);
+    createFile(flumeFile);
+    FSDataOutputStream flumeOutputStream = fs.append(flumeFile);
+
+    final Path hbaseFile = new Path(level1D, hbaseFileName);
+    createFile(hbaseFile);
+    FSDataOutputStream hbaseOutputStream = fs.append(hbaseFile);
+
+    final Path appAFile = new Path(level1C, appAFileName);
+    createFile(appAFile);
+    FSDataOutputStream appAOutputStream = fs.append(appAFile);
+
+    final Path appBFile = new Path(level0B, appBFileName);
+    createFile(appBFile);
+    FSDataOutputStream appBOutputStream = fs.append(appBFile);
+
+    final long appAFileInitialLength = fs.getFileStatus(appAFile).getLen();
+    final long appBFileInitialLength = fs.getFileStatus(appBFile).getLen();
+
+    // Create Snapshot S1
+    final Path flumeS1Dir = SnapshotTestHelper.createSnapshot(
+        fs, flumeSnapRootDir, flumeSnap1Name);
+    final Path flumeS1Path = new Path(flumeS1Dir, flumeRelPathFromSnapDir);
+    final Path hbaseS1Dir = SnapshotTestHelper.createSnapshot(
+        fs, hbaseSnapRootDir, hbaseSnap1Name);
+    final Path hbaseS1Path = new Path(hbaseS1Dir, hbaseFileName);
+
+    final long flumeFileLengthAfterS1 = fs.getFileStatus(flumeFile).getLen();
+    final long hbaseFileLengthAfterS1 = fs.getFileStatus(hbaseFile).getLen();
+
+    // Verify if Snap S1 file lengths are same as the the live ones
+    Assert.assertEquals(flumeFileLengthAfterS1,
+        fs.getFileStatus(flumeS1Path).getLen());
+    Assert.assertEquals(hbaseFileLengthAfterS1,
+        fs.getFileStatus(hbaseS1Path).getLen());
+    Assert.assertEquals(appAFileInitialLength,
+        fs.getFileStatus(appAFile).getLen());
+    Assert.assertEquals(appBFileInitialLength,
+        fs.getFileStatus(appBFile).getLen());
+
+    long flumeFileWrittenDataLength = flumeFileLengthAfterS1;
+    long hbaseFileWrittenDataLength = hbaseFileLengthAfterS1;
+    long appAFileWrittenDataLength = appAFileInitialLength;
+
+    int newWriteLength = (int) (BLOCKSIZE * 1.5);
+    byte[] buf = new byte[newWriteLength];
+    Random random = new Random();
+    random.nextBytes(buf);
+
+    // Write more data to flume and hbase files only
+    flumeFileWrittenDataLength += writeToStream(flumeOutputStream, buf);
+    hbaseFileWrittenDataLength += writeToStream(hbaseOutputStream, buf);
+
+    // Create Snapshot S2
+    final Path flumeS2Dir = SnapshotTestHelper.createSnapshot(
+        fs, flumeSnapRootDir, flumeSnap2Name);
+    final Path flumeS2Path = new Path(flumeS2Dir, flumeRelPathFromSnapDir);
+    final Path hbaseS2Dir = SnapshotTestHelper.createSnapshot(
+        fs, hbaseSnapRootDir, hbaseSnap2Name);
+    final Path hbaseS2Path = new Path(hbaseS2Dir, hbaseFileName);
+
+    // Verify live files lengths are same as all data written till now
+    final long flumeFileLengthAfterS2 = fs.getFileStatus(flumeFile).getLen();
+    final long hbaseFileLengthAfterS2 = fs.getFileStatus(hbaseFile).getLen();
+    Assert.assertEquals(flumeFileWrittenDataLength, flumeFileLengthAfterS2);
+    Assert.assertEquals(hbaseFileWrittenDataLength, hbaseFileLengthAfterS2);
+
+    // Verify if Snap S2 file lengths are same as the live ones
+    Assert.assertEquals(flumeFileLengthAfterS2,
+        fs.getFileStatus(flumeS2Path).getLen());
+    Assert.assertEquals(hbaseFileLengthAfterS2,
+        fs.getFileStatus(hbaseS2Path).getLen());
+    Assert.assertEquals(appAFileInitialLength,
+        fs.getFileStatus(appAFile).getLen());
+    Assert.assertEquals(appBFileInitialLength,
+        fs.getFileStatus(appBFile).getLen());
+
+    // Write more data to appA file only
+    newWriteLength = (int) (BLOCKSIZE * 2.5);
+    buf = new byte[newWriteLength];
+    random.nextBytes(buf);
+    appAFileWrittenDataLength += writeToStream(appAOutputStream, buf);
+
+    // Verify other open files are not affected in their snapshots
+    Assert.assertEquals(flumeFileLengthAfterS2,
+        fs.getFileStatus(flumeS2Path).getLen());
+    Assert.assertEquals(appAFileWrittenDataLength,
+        fs.getFileStatus(appAFile).getLen());
+
+    // Write more data to flume file only
+    newWriteLength = (int) (BLOCKSIZE * 2.5);
+    buf = new byte[newWriteLength];
+    random.nextBytes(buf);
+    flumeFileWrittenDataLength += writeToStream(flumeOutputStream, buf);
+
+    // Create Snapshot S3
+    final Path flumeS3Dir = SnapshotTestHelper.createSnapshot(
+        fs, flumeSnapRootDir, flumeSnap3Name);
+    final Path flumeS3Path = new Path(flumeS3Dir, flumeRelPathFromSnapDir);
+    final Path hbaseS3Dir = SnapshotTestHelper.createSnapshot(
+        fs, hbaseSnapRootDir, hbaseSnap3Name);
+    final Path hbaseS3Path = new Path(hbaseS3Dir, hbaseFileName);
+
+    // Verify live files lengths are same as all data written till now
+    final long flumeFileLengthAfterS3 = fs.getFileStatus(flumeFile).getLen();
+    final long hbaseFileLengthAfterS3 = fs.getFileStatus(hbaseFile).getLen();
+    Assert.assertEquals(flumeFileWrittenDataLength, flumeFileLengthAfterS3);
+    Assert.assertEquals(hbaseFileWrittenDataLength, hbaseFileLengthAfterS3);
+
+    // Verify if Snap S3 file lengths are same as the live ones
+    Assert.assertEquals(flumeFileLengthAfterS3,
+        fs.getFileStatus(flumeS3Path).getLen());
+    Assert.assertEquals(hbaseFileLengthAfterS3,
+        fs.getFileStatus(hbaseS3Path).getLen());
+    Assert.assertEquals(appAFileWrittenDataLength,
+        fs.getFileStatus(appAFile).getLen());
+    Assert.assertEquals(appBFileInitialLength,
+        fs.getFileStatus(appBFile).getLen());
+
+    // Verify old flume snapshots have point-in-time / frozen file lengths
+    // even after the live file have moved forward.
+    Assert.assertEquals(flumeFileLengthAfterS1,
+        fs.getFileStatus(flumeS1Path).getLen());
+    Assert.assertEquals(flumeFileLengthAfterS2,
+        fs.getFileStatus(flumeS2Path).getLen());
+    Assert.assertEquals(flumeFileLengthAfterS3,
+        fs.getFileStatus(flumeS3Path).getLen());
+
+    // Verify old hbase snapshots have point-in-time / frozen file lengths
+    // even after the live files have moved forward.
+    Assert.assertEquals(hbaseFileLengthAfterS1,
+        fs.getFileStatus(hbaseS1Path).getLen());
+    Assert.assertEquals(hbaseFileLengthAfterS2,
+        fs.getFileStatus(hbaseS2Path).getLen());
+    Assert.assertEquals(hbaseFileLengthAfterS3,
+        fs.getFileStatus(hbaseS3Path).getLen());
+
+    flumeOutputStream.close();
+    hbaseOutputStream.close();
+    appAOutputStream.close();
+    appBOutputStream.close();
+  }
+
+  /**
+   * Test snapshot capturing open files and verify the same
+   * across NameNode restarts.
+   */
+  @Test (timeout = 120000)
+  public void testSnapshotsForOpenFilesWithNNRestart() throws Exception {
+    // Construct the directory tree
+    final Path level0A = new Path("/level_0_A");
+    final Path flumeSnapRootDir = level0A;
+    final String flumeFileName = "flume.log";
+    final String flumeSnap1Name = "flume_snap_1";
+    final String flumeSnap2Name = "flume_snap_2";
+
+    // Create files and open a stream
+    final Path flumeFile = new Path(level0A, flumeFileName);
+    createFile(flumeFile);
+    FSDataOutputStream flumeOutputStream = fs.append(flumeFile);
+
+    // Create Snapshot S1
+    final Path flumeS1Dir = SnapshotTestHelper.createSnapshot(
+        fs, flumeSnapRootDir, flumeSnap1Name);
+    final Path flumeS1Path = new Path(flumeS1Dir, flumeFileName);
+    final long flumeFileLengthAfterS1 = fs.getFileStatus(flumeFile).getLen();
+
+    // Verify if Snap S1 file length is same as the the live one
+    Assert.assertEquals(flumeFileLengthAfterS1,
+        fs.getFileStatus(flumeS1Path).getLen());
+
+    long flumeFileWrittenDataLength = flumeFileLengthAfterS1;
+    int newWriteLength = (int) (BLOCKSIZE * 1.5);
+    byte[] buf = new byte[newWriteLength];
+    Random random = new Random();
+    random.nextBytes(buf);
+
+    // Write more data to flume file
+    flumeFileWrittenDataLength += writeToStream(flumeOutputStream, buf);
+
+    // Create Snapshot S2
+    final Path flumeS2Dir = SnapshotTestHelper.createSnapshot(
+        fs, flumeSnapRootDir, flumeSnap2Name);
+    final Path flumeS2Path = new Path(flumeS2Dir, flumeFileName);
+
+    // Verify live files length is same as all data written till now
+    final long flumeFileLengthAfterS2 = fs.getFileStatus(flumeFile).getLen();
+    Assert.assertEquals(flumeFileWrittenDataLength, flumeFileLengthAfterS2);
+
+    // Verify if Snap S2 file length is same as the live one
+    Assert.assertEquals(flumeFileLengthAfterS2,
+        fs.getFileStatus(flumeS2Path).getLen());
+
+    // Write more data to flume file
+    flumeFileWrittenDataLength += writeToStream(flumeOutputStream, buf);
+
+    // Verify old flume snapshots have point-in-time / frozen file lengths
+    // even after the live file have moved forward.
+    Assert.assertEquals(flumeFileLengthAfterS1,
+        fs.getFileStatus(flumeS1Path).getLen());
+    Assert.assertEquals(flumeFileLengthAfterS2,
+        fs.getFileStatus(flumeS2Path).getLen());
+
+    // Restart the NameNode
+    restartNameNode();
+    cluster.waitActive();
+
+    // Verify live file length hasn't changed after NN restart
+    Assert.assertEquals(flumeFileWrittenDataLength,
+        fs.getFileStatus(flumeFile).getLen());
+
+    // Verify old flume snapshots have point-in-time / frozen file lengths
+    // after NN restart and live file moved forward.
+    Assert.assertEquals(flumeFileLengthAfterS1,
+        fs.getFileStatus(flumeS1Path).getLen());
+    Assert.assertEquals(flumeFileLengthAfterS2,
+        fs.getFileStatus(flumeS2Path).getLen());
+
+    flumeOutputStream.close();
+  }
+
   private void restartNameNode() throws Exception {
     cluster.triggerBlockReports();
     NameNode nameNode = cluster.getNameNode();

http://git-wip-us.apache.org/repos/asf/hadoop/blob/20e3ae26/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/snapshot/TestSnapshotDiffReport.java
----------------------------------------------------------------------
diff --git 
a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/snapshot/TestSnapshotDiffReport.java
 
b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/snapshot/TestSnapshotDiffReport.java
index 453afac..b9451bc 100644
--- 
a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/snapshot/TestSnapshotDiffReport.java
+++ 
b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/snapshot/TestSnapshotDiffReport.java
@@ -22,20 +22,29 @@ import static org.junit.Assert.assertTrue;
 import static org.junit.Assert.fail;
 
 import java.io.IOException;
+import java.util.EnumSet;
 import java.util.HashMap;
+import java.util.Random;
 
 import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FSDataOutputStream;
 import org.apache.hadoop.fs.Options.Rename;
 import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hdfs.DFSConfigKeys;
 import org.apache.hadoop.hdfs.DFSTestUtil;
 import org.apache.hadoop.hdfs.DFSUtil;
 import org.apache.hadoop.hdfs.DistributedFileSystem;
 import org.apache.hadoop.hdfs.MiniDFSCluster;
+import org.apache.hadoop.hdfs.client.HdfsDataOutputStream;
+import org.apache.hadoop.hdfs.client.HdfsDataOutputStream.SyncFlag;
 import org.apache.hadoop.hdfs.protocol.SnapshotDiffReport;
 import org.apache.hadoop.hdfs.protocol.SnapshotDiffReport.DiffReportEntry;
 import org.apache.hadoop.hdfs.protocol.SnapshotDiffReport.DiffType;
+import org.apache.hadoop.hdfs.server.namenode.NameNode;
+import org.apache.hadoop.hdfs.server.namenode.NameNodeAdapter;
 import org.apache.hadoop.test.GenericTestUtils;
 import org.junit.After;
+import org.junit.Assert;
 import org.junit.Before;
 import org.junit.Test;
 
@@ -43,12 +52,13 @@ import org.junit.Test;
  * Tests snapshot deletion.
  */
 public class TestSnapshotDiffReport {
-  protected static final long seed = 0;
-  protected static final short REPLICATION = 3;
-  protected static final short REPLICATION_1 = 2;
-  protected static final long BLOCKSIZE = 1024;
-  public static final int SNAPSHOTNUMBER = 10;
-  
+  private static final long SEED = 0;
+  private static final short REPLICATION = 3;
+  private static final short REPLICATION_1 = 2;
+  private static final long BLOCKSIZE = 1024;
+  private static final long BUFFERLEN = BLOCKSIZE / 2;
+  private static final long FILELEN = BLOCKSIZE * 2;
+
   private final Path dir = new Path("/TestSnapshot");
   private final Path sub1 = new Path(dir, "sub1");
   
@@ -61,6 +71,8 @@ public class TestSnapshotDiffReport {
   @Before
   public void setUp() throws Exception {
     conf = new Configuration();
+    conf.setBoolean(
+        DFSConfigKeys.DFS_NAMENODE_SNAPSHOT_CAPTURE_OPENFILES, true);
     cluster = new MiniDFSCluster.Builder(conf).numDataNodes(REPLICATION)
         .format(true).build();
     cluster.waitActive();
@@ -97,10 +109,10 @@ public class TestSnapshotDiffReport {
     Path link13 = new Path(modifyDir, "link13");
     Path file14 = new Path(modifyDir, "file14");
     Path file15 = new Path(modifyDir, "file15");
-    DFSTestUtil.createFile(hdfs, file10, BLOCKSIZE, REPLICATION_1, seed);
-    DFSTestUtil.createFile(hdfs, file11, BLOCKSIZE, REPLICATION_1, seed);
-    DFSTestUtil.createFile(hdfs, file12, BLOCKSIZE, REPLICATION_1, seed);
-    DFSTestUtil.createFile(hdfs, file13, BLOCKSIZE, REPLICATION_1, seed);
+    DFSTestUtil.createFile(hdfs, file10, BLOCKSIZE, REPLICATION_1, SEED);
+    DFSTestUtil.createFile(hdfs, file11, BLOCKSIZE, REPLICATION_1, SEED);
+    DFSTestUtil.createFile(hdfs, file12, BLOCKSIZE, REPLICATION_1, SEED);
+    DFSTestUtil.createFile(hdfs, file13, BLOCKSIZE, REPLICATION_1, SEED);
     // create link13
     hdfs.createSymlink(file13, link13, false);
     // create snapshot
@@ -118,9 +130,9 @@ public class TestSnapshotDiffReport {
     // delete link13
     hdfs.delete(link13, false);
     // create file14
-    DFSTestUtil.createFile(hdfs, file14, BLOCKSIZE, REPLICATION, seed);
+    DFSTestUtil.createFile(hdfs, file14, BLOCKSIZE, REPLICATION, SEED);
     // create file15
-    DFSTestUtil.createFile(hdfs, file15, BLOCKSIZE, REPLICATION, seed);
+    DFSTestUtil.createFile(hdfs, file15, BLOCKSIZE, REPLICATION, SEED);
     
     // create snapshot
     for (Path snapshotDir : snapshotDirs) {
@@ -128,7 +140,7 @@ public class TestSnapshotDiffReport {
     }
     
     // create file11 again
-    DFSTestUtil.createFile(hdfs, file11, BLOCKSIZE, REPLICATION, seed);
+    DFSTestUtil.createFile(hdfs, file11, BLOCKSIZE, REPLICATION, SEED);
     // delete file12
     hdfs.delete(file12, true);
     // modify file13
@@ -386,8 +398,8 @@ public class TestSnapshotDiffReport {
     final Path fileInFoo = new Path(foo, "file");
     final Path bar = new Path(dir2, "bar");
     final Path fileInBar = new Path(bar, "file");
-    DFSTestUtil.createFile(hdfs, fileInFoo, BLOCKSIZE, REPLICATION, seed);
-    DFSTestUtil.createFile(hdfs, fileInBar, BLOCKSIZE, REPLICATION, seed);
+    DFSTestUtil.createFile(hdfs, fileInFoo, BLOCKSIZE, REPLICATION, SEED);
+    DFSTestUtil.createFile(hdfs, fileInBar, BLOCKSIZE, REPLICATION, SEED);
 
     // create snapshot on /dir1
     SnapshotTestHelper.createSnapshot(hdfs, dir1, "s0");
@@ -421,8 +433,8 @@ public class TestSnapshotDiffReport {
     final Path fileInFoo = new Path(foo, "file");
     final Path bar = new Path(dir2, "bar");
     final Path fileInBar = new Path(bar, "file");
-    DFSTestUtil.createFile(hdfs, fileInFoo, BLOCKSIZE, REPLICATION, seed);
-    DFSTestUtil.createFile(hdfs, fileInBar, BLOCKSIZE, REPLICATION, seed);
+    DFSTestUtil.createFile(hdfs, fileInFoo, BLOCKSIZE, REPLICATION, SEED);
+    DFSTestUtil.createFile(hdfs, fileInBar, BLOCKSIZE, REPLICATION, SEED);
 
     SnapshotTestHelper.createSnapshot(hdfs, root, "s0");
     hdfs.rename(fileInFoo, fileInBar, Rename.OVERWRITE);
@@ -454,7 +466,7 @@ public class TestSnapshotDiffReport {
     final Path root = new Path("/");
     final Path foo = new Path(root, "foo");
     final Path fileInFoo = new Path(foo, "file");
-    DFSTestUtil.createFile(hdfs, fileInFoo, BLOCKSIZE, REPLICATION, seed);
+    DFSTestUtil.createFile(hdfs, fileInFoo, BLOCKSIZE, REPLICATION, SEED);
 
     SnapshotTestHelper.createSnapshot(hdfs, root, "s0");
     final Path bar = new Path(root, "bar");
@@ -478,7 +490,7 @@ public class TestSnapshotDiffReport {
   public void testDiffReportWithRenameAndAppend() throws Exception {
     final Path root = new Path("/");
     final Path foo = new Path(root, "foo");
-    DFSTestUtil.createFile(hdfs, foo, BLOCKSIZE, REPLICATION, seed);
+    DFSTestUtil.createFile(hdfs, foo, BLOCKSIZE, REPLICATION, SEED);
 
     SnapshotTestHelper.createSnapshot(hdfs, root, "s0");
     final Path bar = new Path(root, "bar");
@@ -504,7 +516,7 @@ public class TestSnapshotDiffReport {
     final Path root = new Path("/");
     final Path foo = new Path(root, "foo");
     final Path bar = new Path(foo, "bar");
-    DFSTestUtil.createFile(hdfs, bar, BLOCKSIZE, REPLICATION, seed);
+    DFSTestUtil.createFile(hdfs, bar, BLOCKSIZE, REPLICATION, SEED);
 
     SnapshotTestHelper.createSnapshot(hdfs, root, "s0");
     // rename /foo to /foo2
@@ -529,4 +541,140 @@ public class TestSnapshotDiffReport {
         new DiffReportEntry(DiffType.RENAME, DFSUtil.string2Bytes("foo2/bar"),
             DFSUtil.string2Bytes("foo2/bar-new")));
   }
+
+  private void createFile(final Path filePath) throws IOException {
+    DFSTestUtil.createFile(hdfs, filePath, (int) BUFFERLEN,
+        FILELEN, BLOCKSIZE, REPLICATION, SEED);
+  }
+
+  private int writeToStream(final FSDataOutputStream outputStream,
+      byte[] buf) throws IOException {
+    outputStream.write(buf);
+    ((HdfsDataOutputStream)outputStream).hsync(
+        EnumSet.of(SyncFlag.UPDATE_LENGTH));
+    return buf.length;
+  }
+
+  private void restartNameNode() throws Exception {
+    cluster.triggerBlockReports();
+    NameNode nameNode = cluster.getNameNode();
+    NameNodeAdapter.enterSafeMode(nameNode, false);
+    NameNodeAdapter.saveNamespace(nameNode);
+    NameNodeAdapter.leaveSafeMode(nameNode);
+    cluster.restartNameNode(true);
+  }
+
+  /**
+   * Test Snapshot diff report for snapshots with open files captures in them.
+   * Also verify if the diff report remains the same across NameNode restarts.
+   */
+  @Test (timeout = 120000)
+  public void testDiffReportWithOpenFiles() throws Exception {
+    // Construct the directory tree
+    final Path level0A = new Path("/level_0_A");
+    final Path flumeSnapRootDir = level0A;
+    final String flumeFileName = "flume.log";
+    final String flumeSnap1Name = "flume_snap_1";
+    final String flumeSnap2Name = "flume_snap_2";
+
+    // Create files and open a stream
+    final Path flumeFile = new Path(level0A, flumeFileName);
+    createFile(flumeFile);
+    FSDataOutputStream flumeOutputStream = hdfs.append(flumeFile);
+
+    // Create Snapshot S1
+    final Path flumeS1Dir = SnapshotTestHelper.createSnapshot(
+        hdfs, flumeSnapRootDir, flumeSnap1Name);
+    final Path flumeS1Path = new Path(flumeS1Dir, flumeFileName);
+    final long flumeFileLengthAfterS1 = hdfs.getFileStatus(flumeFile).getLen();
+
+    // Verify if Snap S1 file length is same as the the live one
+    Assert.assertEquals(flumeFileLengthAfterS1,
+        hdfs.getFileStatus(flumeS1Path).getLen());
+
+    verifyDiffReport(level0A, flumeSnap1Name, "",
+        new DiffReportEntry(DiffType.MODIFY, DFSUtil.string2Bytes("")));
+
+    long flumeFileWrittenDataLength = flumeFileLengthAfterS1;
+    int newWriteLength = (int) (BLOCKSIZE * 1.5);
+    byte[] buf = new byte[newWriteLength];
+    Random random = new Random();
+    random.nextBytes(buf);
+
+    // Write more data to flume file
+    flumeFileWrittenDataLength += writeToStream(flumeOutputStream, buf);
+
+    // Create Snapshot S2
+    final Path flumeS2Dir = SnapshotTestHelper.createSnapshot(
+        hdfs, flumeSnapRootDir, flumeSnap2Name);
+    final Path flumeS2Path = new Path(flumeS2Dir, flumeFileName);
+
+    // Verify live files length is same as all data written till now
+    final long flumeFileLengthAfterS2 = hdfs.getFileStatus(flumeFile).getLen();
+    Assert.assertEquals(flumeFileWrittenDataLength, flumeFileLengthAfterS2);
+
+    // Verify if Snap S2 file length is same as the live one
+    Assert.assertEquals(flumeFileLengthAfterS2,
+        hdfs.getFileStatus(flumeS2Path).getLen());
+
+    verifyDiffReport(level0A, flumeSnap1Name, "",
+        new DiffReportEntry(DiffType.MODIFY, DFSUtil.string2Bytes("")),
+        new DiffReportEntry(DiffType.MODIFY,
+            DFSUtil.string2Bytes(flumeFileName)));
+
+    verifyDiffReport(level0A, flumeSnap2Name, "",
+        new DiffReportEntry(DiffType.MODIFY, DFSUtil.string2Bytes("")));
+
+    verifyDiffReport(level0A, flumeSnap1Name, flumeSnap2Name,
+        new DiffReportEntry(DiffType.MODIFY, DFSUtil.string2Bytes("")),
+        new DiffReportEntry(DiffType.MODIFY,
+            DFSUtil.string2Bytes(flumeFileName)));
+
+    // Write more data to flume file
+    flumeFileWrittenDataLength += writeToStream(flumeOutputStream, buf);
+
+    // Verify old flume snapshots have point-in-time / frozen file lengths
+    // even after the live file have moved forward.
+    Assert.assertEquals(flumeFileLengthAfterS1,
+        hdfs.getFileStatus(flumeS1Path).getLen());
+    Assert.assertEquals(flumeFileLengthAfterS2,
+        hdfs.getFileStatus(flumeS2Path).getLen());
+
+    flumeOutputStream.close();
+
+    // Verify if Snap S2 file length is same as the live one
+    Assert.assertEquals(flumeFileWrittenDataLength,
+        hdfs.getFileStatus(flumeFile).getLen());
+
+    // Verify old flume snapshots have point-in-time / frozen file lengths
+    // even after the live file have moved forward.
+    Assert.assertEquals(flumeFileLengthAfterS1,
+        hdfs.getFileStatus(flumeS1Path).getLen());
+    Assert.assertEquals(flumeFileLengthAfterS2,
+        hdfs.getFileStatus(flumeS2Path).getLen());
+
+    verifyDiffReport(level0A, flumeSnap1Name, "",
+        new DiffReportEntry(DiffType.MODIFY, DFSUtil.string2Bytes("")),
+        new DiffReportEntry(DiffType.MODIFY,
+            DFSUtil.string2Bytes(flumeFileName)));
+
+    verifyDiffReport(level0A, flumeSnap2Name, "",
+        new DiffReportEntry(DiffType.MODIFY, DFSUtil.string2Bytes("")),
+        new DiffReportEntry(DiffType.MODIFY,
+            DFSUtil.string2Bytes(flumeFileName)));
+
+    verifyDiffReport(level0A, flumeSnap1Name, flumeSnap2Name,
+        new DiffReportEntry(DiffType.MODIFY, DFSUtil.string2Bytes("")),
+        new DiffReportEntry(DiffType.MODIFY,
+            DFSUtil.string2Bytes(flumeFileName)));
+
+    restartNameNode();
+
+    verifyDiffReport(level0A, flumeSnap1Name, flumeSnap2Name,
+        new DiffReportEntry(DiffType.MODIFY, DFSUtil.string2Bytes("")),
+        new DiffReportEntry(DiffType.MODIFY,
+            DFSUtil.string2Bytes(flumeFileName)));
+
+  }
+
 }
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/hadoop/blob/20e3ae26/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/snapshot/TestSnapshotManager.java
----------------------------------------------------------------------
diff --git 
a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/snapshot/TestSnapshotManager.java
 
b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/snapshot/TestSnapshotManager.java
index be14305..fd35388 100644
--- 
a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/snapshot/TestSnapshotManager.java
+++ 
b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/snapshot/TestSnapshotManager.java
@@ -23,11 +23,13 @@ import static org.mockito.Mockito.doReturn;
 import static org.mockito.Mockito.mock;
 import static org.mockito.Mockito.spy;
 
+import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.hdfs.protocol.SnapshotException;
 import org.apache.hadoop.hdfs.server.namenode.FSDirectory;
 import org.apache.hadoop.hdfs.server.namenode.INode;
 import org.apache.hadoop.hdfs.server.namenode.INodeDirectory;
 import org.apache.hadoop.hdfs.server.namenode.INodesInPath;
+import org.apache.hadoop.hdfs.server.namenode.LeaseManager;
 import org.apache.hadoop.util.StringUtils;
 import org.junit.Assert;
 import org.junit.Test;
@@ -46,25 +48,26 @@ public class TestSnapshotManager {
   public void testSnapshotLimits() throws Exception {
     // Setup mock objects for SnapshotManager.createSnapshot.
     //
+    LeaseManager leaseManager = mock(LeaseManager.class);
     INodeDirectory ids = mock(INodeDirectory.class);
     FSDirectory fsdir = mock(FSDirectory.class);
     INodesInPath iip = mock(INodesInPath.class);
 
-    SnapshotManager sm = spy(new SnapshotManager(fsdir));
+    SnapshotManager sm = spy(new SnapshotManager(new Configuration(), fsdir));
     doReturn(ids).when(sm).getSnapshottableRoot((INodesInPath) anyObject());
     doReturn(testMaxSnapshotLimit).when(sm).getMaxSnapshotID();
 
     // Create testMaxSnapshotLimit snapshots. These should all succeed.
     //
     for (Integer i = 0; i < testMaxSnapshotLimit; ++i) {
-      sm.createSnapshot(iip, "dummy", i.toString());
+      sm.createSnapshot(leaseManager, iip, "dummy", i.toString());
     }
 
     // Attempt to create one more snapshot. This should fail due to snapshot
     // ID rollover.
     //
     try {
-      sm.createSnapshot(iip, "dummy", "shouldFailSnapshot");
+      sm.createSnapshot(leaseManager, iip, "dummy", "shouldFailSnapshot");
       Assert.fail("Expected SnapshotException not thrown");
     } catch (SnapshotException se) {
       Assert.assertTrue(
@@ -79,7 +82,7 @@ public class TestSnapshotManager {
     // to snapshot ID rollover.
     //
     try {
-      sm.createSnapshot(iip, "dummy", "shouldFailSnapshot2");
+      sm.createSnapshot(leaseManager, iip, "dummy", "shouldFailSnapshot2");
       Assert.fail("Expected SnapshotException not thrown");
     } catch (SnapshotException se) {
       Assert.assertTrue(


---------------------------------------------------------------------
To unsubscribe, e-mail: common-commits-unsubscr...@hadoop.apache.org
For additional commands, e-mail: common-commits-h...@hadoop.apache.org

Reply via email to