Modified: 
hadoop/common/branches/HDFS-6584/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSDirectory.java
URL: 
http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-6584/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSDirectory.java?rev=1619293&r1=1619292&r2=1619293&view=diff
==============================================================================
--- 
hadoop/common/branches/HDFS-6584/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSDirectory.java
 (original)
+++ 
hadoop/common/branches/HDFS-6584/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSDirectory.java
 Thu Aug 21 05:22:10 2014
@@ -17,6 +17,9 @@
  */
 package org.apache.hadoop.hdfs.server.namenode;
 
+import static org.apache.hadoop.fs.BatchedRemoteIterator.BatchedListEntries;
+import static 
org.apache.hadoop.hdfs.server.common.HdfsServerConstants.CRYPTO_XATTR_ENCRYPTION_ZONE;
+import static 
org.apache.hadoop.hdfs.server.common.HdfsServerConstants.CRYPTO_XATTR_FILE_ENCRYPTION_INFO;
 import static org.apache.hadoop.util.Time.now;
 
 import java.io.Closeable;
@@ -29,11 +32,13 @@ import java.util.List;
 import java.util.ListIterator;
 import java.util.concurrent.locks.ReentrantReadWriteLock;
 
+import com.google.protobuf.InvalidProtocolBufferException;
 import org.apache.hadoop.HadoopIllegalArgumentException;
 import org.apache.hadoop.classification.InterfaceAudience;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.ContentSummary;
 import org.apache.hadoop.fs.FileAlreadyExistsException;
+import org.apache.hadoop.fs.FileEncryptionInfo;
 import org.apache.hadoop.fs.Options;
 import org.apache.hadoop.fs.Options.Rename;
 import org.apache.hadoop.fs.ParentNotDirectoryException;
@@ -50,10 +55,12 @@ import org.apache.hadoop.hdfs.BlockStora
 import org.apache.hadoop.hdfs.DFSConfigKeys;
 import org.apache.hadoop.hdfs.DFSUtil;
 import org.apache.hadoop.hdfs.DistributedFileSystem;
+import org.apache.hadoop.hdfs.XAttrHelper;
 import org.apache.hadoop.hdfs.protocol.AclException;
 import org.apache.hadoop.hdfs.protocol.Block;
 import org.apache.hadoop.hdfs.protocol.ClientProtocol;
 import org.apache.hadoop.hdfs.protocol.DirectoryListing;
+import org.apache.hadoop.hdfs.protocol.EncryptionZoneWithId;
 import 
org.apache.hadoop.hdfs.protocol.FSLimitException.MaxDirectoryItemsExceededException;
 import 
org.apache.hadoop.hdfs.protocol.FSLimitException.PathComponentTooLongException;
 import org.apache.hadoop.hdfs.protocol.FsAclPermission;
@@ -65,6 +72,8 @@ import org.apache.hadoop.hdfs.protocol.L
 import org.apache.hadoop.hdfs.protocol.QuotaExceededException;
 import org.apache.hadoop.hdfs.protocol.SnapshotAccessControlException;
 import org.apache.hadoop.hdfs.protocol.SnapshotException;
+import org.apache.hadoop.hdfs.protocol.proto.HdfsProtos;
+import org.apache.hadoop.hdfs.protocolPB.PBHelper;
 import org.apache.hadoop.hdfs.server.blockmanagement.BlockInfo;
 import 
org.apache.hadoop.hdfs.server.blockmanagement.BlockInfoUnderConstruction;
 import org.apache.hadoop.hdfs.server.blockmanagement.BlockManager;
@@ -113,9 +122,14 @@ public class FSDirectory implements Clos
       + DOT_RESERVED_STRING;
   public final static byte[] DOT_RESERVED = 
       DFSUtil.string2Bytes(DOT_RESERVED_STRING);
+  private final static String RAW_STRING = "raw";
+  private final static byte[] RAW = DFSUtil.string2Bytes(RAW_STRING);
   public final static String DOT_INODES_STRING = ".inodes";
   public final static byte[] DOT_INODES = 
       DFSUtil.string2Bytes(DOT_INODES_STRING);
+  private final XAttr KEYID_XATTR =
+      XAttrHelper.buildXAttr(CRYPTO_XATTR_ENCRYPTION_ZONE, null);
+
   INodeDirectory rootDir;
   private final FSNamesystem namesystem;
   private volatile boolean skipQuotaCheck = false; //skip while consuming edits
@@ -152,7 +166,7 @@ public class FSDirectory implements Clos
   }
 
   boolean hasReadLock() {
-    return this.dirLock.getReadHoldCount() > 0;
+    return this.dirLock.getReadHoldCount() > 0 || hasWriteLock();
   }
 
   public int getReadHoldCount() {
@@ -163,6 +177,9 @@ public class FSDirectory implements Clos
     return this.dirLock.getWriteHoldCount();
   }
 
+  @VisibleForTesting
+  public final EncryptionZoneManager ezManager;
+
   /**
    * Caches frequently used file names used in {@link INode} to reuse 
    * byte[] objects and reduce heap usage.
@@ -191,6 +208,7 @@ public class FSDirectory implements Clos
     this.inodeXAttrsLimit = conf.getInt(
         DFSConfigKeys.DFS_NAMENODE_MAX_XATTRS_PER_INODE_KEY,
         DFSConfigKeys.DFS_NAMENODE_MAX_XATTRS_PER_INODE_DEFAULT);
+
     Preconditions.checkArgument(this.inodeXAttrsLimit >= 0,
         "Cannot set a negative limit on the number of xattrs per inode (%s).",
         DFSConfigKeys.DFS_NAMENODE_MAX_XATTRS_PER_INODE_KEY);
@@ -210,6 +228,8 @@ public class FSDirectory implements Clos
         + " times");
     nameCache = new NameCache<ByteArray>(threshold);
     namesystem = ns;
+
+    ezManager = new EncryptionZoneManager(this, conf);
   }
     
   private FSNamesystem getFSNamesystem() {
@@ -511,6 +531,7 @@ public class FSDirectory implements Clos
       return false;
     }
     
+    ezManager.checkMoveValidity(srcIIP, dstIIP, src);
     // Ensure dst has quota to accommodate rename
     verifyFsLimitsForRename(srcIIP, dstIIP);
     verifyQuotaForRename(srcIIP.getINodes(), dstIIP.getINodes());
@@ -589,6 +610,7 @@ public class FSDirectory implements Clos
       throw new IOException(error);
     }
 
+    ezManager.checkMoveValidity(srcIIP, dstIIP, src);
     final INode dstInode = dstIIP.getLastINode();
     List<INodeDirectory> snapshottableDirs = new ArrayList<INodeDirectory>();
     if (dstInode != null) { // Destination exists
@@ -649,15 +671,20 @@ public class FSDirectory implements Clos
         tx.updateMtimeAndLease(timestamp);
 
         // Collect the blocks and remove the lease for previous dst
-        long filesDeleted = -1;
+        boolean filesDeleted = false;
         if (removedDst != null) {
           undoRemoveDst = false;
           if (removedNum > 0) {
             BlocksMapUpdateInfo collectedBlocks = new BlocksMapUpdateInfo();
             List<INode> removedINodes = new ChunkedArrayList<INode>();
-            filesDeleted = removedDst.cleanSubtree(Snapshot.CURRENT_STATE_ID,
-                dstIIP.getLatestSnapshotId(), collectedBlocks, removedINodes,
-                true).get(Quota.NAMESPACE);
+            if (!removedDst.isInLatestSnapshot(dstIIP.getLatestSnapshotId())) {
+              removedDst.destroyAndCollectBlocks(collectedBlocks, 
removedINodes);
+              filesDeleted = true;
+            } else {
+              filesDeleted = removedDst.cleanSubtree(Snapshot.CURRENT_STATE_ID,
+                  dstIIP.getLatestSnapshotId(), collectedBlocks, removedINodes,
+                  true).get(Quota.NAMESPACE) >= 0;
+            }
             getFSNamesystem().removePathAndBlocks(src, collectedBlocks,
                 removedINodes, false);
           }
@@ -670,7 +697,7 @@ public class FSDirectory implements Clos
         }
 
         tx.updateQuotasInSourceTree();
-        return filesDeleted >= 0;
+        return filesDeleted;
       }
     } finally {
       if (undoRemoveSrc) {
@@ -1333,6 +1360,7 @@ public class FSDirectory implements Clos
       boolean needLocation, boolean isSuperUser)
       throws UnresolvedLinkException, IOException {
     String srcs = normalizePath(src);
+    final boolean isRawPath = isReservedRawName(src);
 
     readLock();
     try {
@@ -1351,7 +1379,8 @@ public class FSDirectory implements Clos
       if (!targetNode.isDirectory()) {
         return new DirectoryListing(
             new HdfsFileStatus[]{createFileStatus(HdfsFileStatus.EMPTY_NAME,
-                targetNode, needLocation, parentStoragePolicy, snapshot)}, 0);
+                targetNode, needLocation, parentStoragePolicy, snapshot,
+                isRawPath)}, 0);
       }
 
       final INodeDirectory dirInode = targetNode.asDirectory();
@@ -1365,9 +1394,10 @@ public class FSDirectory implements Clos
       for (int i=0; i<numOfListing && locationBudget>0; i++) {
         INode cur = contents.get(startChild+i);
         byte curPolicy = cur.getStoragePolicyID(snapshot);
-        listing[i] = createFileStatus(cur.getLocalNameBytes(), cur,
-            needLocation, curPolicy != BlockStoragePolicy.ID_UNSPECIFIED ?
-                curPolicy : parentStoragePolicy, snapshot);
+        listing[i] = createFileStatus(cur.getLocalNameBytes(), cur, 
needLocation,
+            curPolicy != BlockStoragePolicy.ID_UNSPECIFIED ?
+                curPolicy : parentStoragePolicy,
+            snapshot, isRawPath);
         listingCnt++;
         if (needLocation) {
             // Once we  hit lsLimit locations, stop.
@@ -1418,7 +1448,7 @@ public class FSDirectory implements Clos
     for (int i = 0; i < numOfListing; i++) {
       Root sRoot = snapshots.get(i + skipSize).getRoot();
       listing[i] = createFileStatus(sRoot.getLocalNameBytes(), sRoot,
-          BlockStoragePolicy.ID_UNSPECIFIED, Snapshot.CURRENT_STATE_ID);
+          BlockStoragePolicy.ID_UNSPECIFIED, Snapshot.CURRENT_STATE_ID, false);
     }
     return new DirectoryListing(
         listing, snapshots.size() - skipSize - numOfListing);
@@ -1426,12 +1456,13 @@ public class FSDirectory implements Clos
 
   /** Get the file info for a specific file.
    * @param src The string representation of the path to the file
-   * @param resolveLink whether to throw UnresolvedLinkException 
+   * @param resolveLink whether to throw UnresolvedLinkException
+   * @param isRawPath true if a /.reserved/raw pathname was passed by the user
    * @return object containing information regarding the file
    *         or null if file not found
    */
-  HdfsFileStatus getFileInfo(String src, boolean resolveLink) 
-      throws UnresolvedLinkException {
+  HdfsFileStatus getFileInfo(String src, boolean resolveLink, boolean 
isRawPath)
+    throws IOException {
     String srcs = normalizePath(src);
     readLock();
     try {
@@ -1441,7 +1472,8 @@ public class FSDirectory implements Clos
       final INodesInPath inodesInPath = getLastINodeInPath(srcs, resolveLink);
       final INode i = inodesInPath.getINode(0);
       return i == null ? null : createFileStatus(HdfsFileStatus.EMPTY_NAME, i,
-          BlockStoragePolicy.ID_UNSPECIFIED, inodesInPath.getPathSnapshotId());
+          BlockStoragePolicy.ID_UNSPECIFIED, inodesInPath.getPathSnapshotId(),
+          isRawPath);
     } finally {
       readUnlock();
     }
@@ -1458,7 +1490,7 @@ public class FSDirectory implements Clos
       throws UnresolvedLinkException {
     if (getINode4DotSnapshot(src) != null) {
       return new HdfsFileStatus(0, true, 0, 0, 0, 0, null, null, null, null,
-          HdfsFileStatus.EMPTY_NAME, -1L, 0, 
BlockStoragePolicy.ID_UNSPECIFIED);
+          HdfsFileStatus.EMPTY_NAME, -1L, 0, null, 
BlockStoragePolicy.ID_UNSPECIFIED);
     }
     return null;
   }
@@ -2086,6 +2118,19 @@ public class FSDirectory implements Clos
   public final void addToInodeMap(INode inode) {
     if (inode instanceof INodeWithAdditionalFields) {
       inodeMap.put(inode);
+      if (!inode.isSymlink()) {
+        final XAttrFeature xaf = inode.getXAttrFeature();
+        if (xaf != null) {
+          final List<XAttr> xattrs = xaf.getXAttrs();
+          for (XAttr xattr : xattrs) {
+            final String xaName = XAttrHelper.getPrefixName(xattr);
+            if (CRYPTO_XATTR_ENCRYPTION_ZONE.equals(xaName)) {
+              ezManager.addEncryptionZone(inode.getId(),
+                  new String(xattr.getValue()));
+            }
+          }
+        }
+      }
     }
   }
   
@@ -2097,6 +2142,7 @@ public class FSDirectory implements Clos
       for (INode inode : inodes) {
         if (inode != null && inode instanceof INodeWithAdditionalFields) {
           inodeMap.remove(inode);
+          ezManager.removeEncryptionZone(inode.getId());
         }
       }
     }
@@ -2266,15 +2312,18 @@ public class FSDirectory implements Clos
    * @param path the local name
    * @param node inode
    * @param needLocation if block locations need to be included or not
+   * @param isRawPath true if this is being called on behalf of a path in
+   *                  /.reserved/raw
    * @return a file status
    * @throws IOException if any error occurs
    */
   private HdfsFileStatus createFileStatus(byte[] path, INode node,
-      boolean needLocation, byte storagePolicy, int snapshot) throws 
IOException {
+      boolean needLocation, byte storagePolicy, int snapshot, boolean 
isRawPath)
+      throws IOException {
     if (needLocation) {
-      return createLocatedFileStatus(path, node, storagePolicy, snapshot);
+      return createLocatedFileStatus(path, node, storagePolicy, snapshot, 
isRawPath);
     } else {
-      return createFileStatus(path, node, storagePolicy, snapshot);
+      return createFileStatus(path, node, storagePolicy, snapshot, isRawPath);
     }
   }
 
@@ -2282,7 +2331,7 @@ public class FSDirectory implements Clos
    * Create FileStatus by file INode 
    */
   HdfsFileStatus createFileStatus(byte[] path, INode node, byte storagePolicy,
-      int snapshot) {
+      int snapshot, boolean isRawPath) throws IOException {
      long size = 0;     // length is zero for directories
      short replication = 0;
      long blocksize = 0;
@@ -2294,6 +2343,8 @@ public class FSDirectory implements Clos
      }
      int childrenNum = node.isDirectory() ? 
          node.asDirectory().getChildrenNum(snapshot) : 0;
+     FileEncryptionInfo feInfo = isRawPath ? null :
+         getFileEncryptionInfo(node, snapshot);
 
      return new HdfsFileStatus(
         size, 
@@ -2308,7 +2359,9 @@ public class FSDirectory implements Clos
         node.isSymlink() ? node.asSymlink().getSymlink() : null,
         path,
         node.getId(),
-        childrenNum, storagePolicy);
+        childrenNum,
+        feInfo,
+        storagePolicy);
   }
 
   private byte getStoragePolicy(INode[] inodes, int snapshotId) {
@@ -2324,13 +2377,15 @@ public class FSDirectory implements Clos
   /**
    * Create FileStatus with location info by file INode
    */
-  private HdfsLocatedFileStatus createLocatedFileStatus(byte[] path,
-      INode node, byte storagePolicy, int snapshot) throws IOException {
+  private HdfsLocatedFileStatus createLocatedFileStatus(byte[] path, INode 
node,
+      byte storagePolicy, int snapshot, boolean isRawPath) throws IOException {
     assert hasReadLock();
     long size = 0; // length is zero for directories
     short replication = 0;
     long blocksize = 0;
     LocatedBlocks loc = null;
+    final FileEncryptionInfo feInfo = isRawPath ? null :
+        getFileEncryptionInfo(node, snapshot);
     if (node.isFile()) {
       final INodeFile fileNode = node.asFile();
       size = fileNode.computeFileSize(snapshot);
@@ -2341,9 +2396,10 @@ public class FSDirectory implements Clos
       final boolean isUc = !inSnapshot && fileNode.isUnderConstruction();
       final long fileSize = !inSnapshot && isUc ? 
           fileNode.computeFileSizeNotIncludingLastUcBlock() : size;
+
       loc = getFSNamesystem().getBlockManager().createLocatedBlocks(
           fileNode.getBlocks(), fileSize, isUc, 0L, size, false,
-          inSnapshot);
+          inSnapshot, feInfo);
       if (loc == null) {
         loc = new LocatedBlocks();
       }
@@ -2358,7 +2414,7 @@ public class FSDirectory implements Clos
           getPermissionForFileStatus(node, snapshot),
           node.getUserName(snapshot), node.getGroupName(snapshot),
           node.isSymlink() ? node.asSymlink().getSymlink() : null, path,
-          node.getId(), loc, childrenNum, storagePolicy);
+          node.getId(), loc, childrenNum, feInfo, storagePolicy);
     // Set caching information for the located blocks.
     if (loc != null) {
       CacheManager cacheManager = namesystem.getCacheManager();
@@ -2605,6 +2661,8 @@ public class FSDirectory implements Clos
       for (ListIterator<XAttr> it = toFilter.listIterator(); it.hasNext()
           ;) {
         XAttr filter = it.next();
+        Preconditions.checkArgument(!KEYID_XATTR.equalsIgnoreValue(filter),
+            "The encryption zone xattr should never be deleted.");
         if (a.equalsIgnoreValue(filter)) {
           add = false;
           it.remove();
@@ -2619,7 +2677,111 @@ public class FSDirectory implements Clos
 
     return newXAttrs;
   }
-  
+
+  boolean isInAnEZ(INodesInPath iip)
+      throws UnresolvedLinkException, SnapshotAccessControlException {
+    readLock();
+    try {
+      return ezManager.isInAnEZ(iip);
+    } finally {
+      readUnlock();
+    }
+  }
+
+  String getKeyName(INodesInPath iip) {
+    readLock();
+    try {
+      return ezManager.getKeyName(iip);
+    } finally {
+      readUnlock();
+    }
+  }
+
+  XAttr createEncryptionZone(String src, String keyName)
+    throws IOException {
+    writeLock();
+    try {
+      return ezManager.createEncryptionZone(src, keyName);
+    } finally {
+      writeUnlock();
+    }
+  }
+
+  EncryptionZoneWithId getEZForPath(INodesInPath iip) {
+    readLock();
+    try {
+      return ezManager.getEZINodeForPath(iip);
+    } finally {
+      readUnlock();
+    }
+  }
+
+  BatchedListEntries<EncryptionZoneWithId> listEncryptionZones(long prevId)
+      throws IOException {
+    readLock();
+    try {
+      return ezManager.listEncryptionZones(prevId);
+    } finally {
+      readUnlock();
+    }
+  }
+
+  /**
+   * Set the FileEncryptionInfo for an INode.
+   */
+  void setFileEncryptionInfo(String src, FileEncryptionInfo info)
+      throws IOException {
+    // Make the PB for the xattr
+    final HdfsProtos.FileEncryptionInfoProto proto = PBHelper.convert(info);
+    final byte[] protoBytes = proto.toByteArray();
+    final XAttr fileEncryptionAttr =
+        XAttrHelper.buildXAttr(CRYPTO_XATTR_FILE_ENCRYPTION_INFO, protoBytes);
+    final List<XAttr> xAttrs = Lists.newArrayListWithCapacity(1);
+    xAttrs.add(fileEncryptionAttr);
+
+    writeLock();
+    try {
+      unprotectedSetXAttrs(src, xAttrs, EnumSet.of(XAttrSetFlag.CREATE));
+    } finally {
+      writeUnlock();
+    }
+  }
+
+  /**
+   * Return the FileEncryptionInfo for an INode, or null if the INode is not
+   * an encrypted file.
+   */
+  FileEncryptionInfo getFileEncryptionInfo(INode inode, int snapshotId)
+      throws IOException {
+    if (!inode.isFile()) {
+      return null;
+    }
+    readLock();
+    try {
+      List<XAttr> xAttrs = XAttrStorage.readINodeXAttrs(inode, snapshotId);
+      if (xAttrs == null) {
+        return null;
+      }
+      for (XAttr x : xAttrs) {
+        if (XAttrHelper.getPrefixName(x)
+            .equals(CRYPTO_XATTR_FILE_ENCRYPTION_INFO)) {
+          try {
+            HdfsProtos.FileEncryptionInfoProto proto =
+                HdfsProtos.FileEncryptionInfoProto.parseFrom(x.getValue());
+            FileEncryptionInfo feInfo = PBHelper.convert(proto);
+            return feInfo;
+          } catch (InvalidProtocolBufferException e) {
+            throw new IOException("Could not parse file encryption info for " +
+                "inode " + inode, e);
+          }
+        }
+      }
+      return null;
+    } finally {
+      readUnlock();
+    }
+  }
+
   void setXAttrs(final String src, final List<XAttr> xAttrs,
       final EnumSet<XAttrSetFlag> flag) throws IOException {
     writeLock();
@@ -2630,7 +2792,7 @@ public class FSDirectory implements Clos
     }
   }
   
-  void unprotectedSetXAttrs(final String src, final List<XAttr> xAttrs,
+  INode unprotectedSetXAttrs(final String src, final List<XAttr> xAttrs,
       final EnumSet<XAttrSetFlag> flag)
       throws QuotaExceededException, IOException {
     assert hasWriteLock();
@@ -2639,7 +2801,20 @@ public class FSDirectory implements Clos
     int snapshotId = iip.getLatestSnapshotId();
     List<XAttr> existingXAttrs = XAttrStorage.readINodeXAttrs(inode);
     List<XAttr> newXAttrs = setINodeXAttrs(existingXAttrs, xAttrs, flag);
+
+    /*
+     * If we're adding the encryption zone xattr, then add src to the list
+     * of encryption zones.
+     */
+    for (XAttr xattr : newXAttrs) {
+      final String xaName = XAttrHelper.getPrefixName(xattr);
+      if (CRYPTO_XATTR_ENCRYPTION_ZONE.equals(xaName)) {
+        ezManager.addEncryptionZone(inode.getId(), new 
String(xattr.getValue()));
+      }
+    }
+
     XAttrStorage.updateINodeXAttrs(inode, newXAttrs, snapshotId);
+    return inode;
   }
 
   List<XAttr> setINodeXAttrs(final List<XAttr> existingXAttrs,
@@ -2796,27 +2971,73 @@ public class FSDirectory implements Clos
     return src.startsWith(DOT_RESERVED_PATH_PREFIX);
   }
 
+  static boolean isReservedRawName(String src) {
+    return src.startsWith(DOT_RESERVED_PATH_PREFIX +
+        Path.SEPARATOR + RAW_STRING);
+  }
+
   /**
-   * Resolve the path of /.reserved/.inodes/<inodeid>/... to a regular path
+   * Resolve a /.reserved/... path to a non-reserved path.
+   * <p/>
+   * There are two special hierarchies under /.reserved/:
+   * <p/>
+   * /.reserved/.inodes/<inodeid> performs a path lookup by inodeid,
+   * <p/>
+   * /.reserved/raw/... returns the encrypted (raw) bytes of a file in an
+   * encryption zone. For instance, if /ezone is an encryption zone, then
+   * /ezone/a refers to the decrypted file and /.reserved/raw/ezone/a refers to
+   * the encrypted (raw) bytes of /ezone/a.
+   * <p/>
+   * Pathnames in the /.reserved/raw directory that resolve to files not in an
+   * encryption zone are equivalent to the corresponding non-raw path. Hence,
+   * if /a/b/c refers to a file that is not in an encryption zone, then
+   * /.reserved/raw/a/b/c is equivalent (they both refer to the same
+   * unencrypted file).
    * 
    * @param src path that is being processed
    * @param pathComponents path components corresponding to the path
    * @param fsd FSDirectory
-   * @return if the path indicates an inode, return path after replacing upto
+   * @return if the path indicates an inode, return path after replacing up to
    *         <inodeid> with the corresponding path of the inode, else the path
-   *         in {@code src} as is.
+   *         in {@code src} as is. If the path refers to a path in the "raw"
+   *         directory, return the non-raw pathname.
    * @throws FileNotFoundException if inodeid is invalid
    */
-  static String resolvePath(String src, byte[][] pathComponents, FSDirectory 
fsd)
-      throws FileNotFoundException {
-    if (pathComponents == null || pathComponents.length <= 3) {
+  static String resolvePath(String src, byte[][] pathComponents,
+      FSDirectory fsd) throws FileNotFoundException {
+    final int nComponents = (pathComponents == null) ?
+        0 : pathComponents.length;
+    if (nComponents <= 2) {
       return src;
     }
-    // Not /.reserved/.inodes
-    if (!Arrays.equals(DOT_RESERVED, pathComponents[1])
-        || !Arrays.equals(DOT_INODES, pathComponents[2])) { // Not .inodes path
+    if (!Arrays.equals(DOT_RESERVED, pathComponents[1])) {
+      /* This is not a /.reserved/ path so do nothing. */
+      return src;
+    }
+
+    if (Arrays.equals(DOT_INODES, pathComponents[2])) {
+      /* It's a /.reserved/.inodes path. */
+      if (nComponents > 3) {
+        return resolveDotInodesPath(src, pathComponents, fsd);
+      } else {
+        return src;
+      }
+    } else if (Arrays.equals(RAW, pathComponents[2])) {
+      /* It's /.reserved/raw so strip off the /.reserved/raw prefix. */
+      if (nComponents == 3) {
+        return Path.SEPARATOR;
+      } else {
+        return constructRemainingPath("", pathComponents, 3);
+      }
+    } else {
+      /* It's some sort of /.reserved/<unknown> path. Ignore it. */
       return src;
     }
+  }
+
+  private static String resolveDotInodesPath(String src,
+      byte[][] pathComponents, FSDirectory fsd)
+      throws FileNotFoundException {
     final String inodeId = DFSUtil.bytes2String(pathComponents[3]);
     final long id;
     try {
@@ -2845,10 +3066,20 @@ public class FSDirectory implements Clos
       }
     }
 
-    StringBuilder path = id == INodeId.ROOT_INODE_ID ? new StringBuilder()
-        : new StringBuilder(inode.getFullPathName());
-    for (int i = 4; i < pathComponents.length; i++) {
-      
path.append(Path.SEPARATOR).append(DFSUtil.bytes2String(pathComponents[i]));
+    String path = "";
+    if (id != INodeId.ROOT_INODE_ID) {
+      path = inode.getFullPathName();
+    }
+    return constructRemainingPath(path, pathComponents, 4);
+  }
+
+  private static String constructRemainingPath(String pathPrefix,
+      byte[][] pathComponents, int startAt) {
+
+    StringBuilder path = new StringBuilder(pathPrefix);
+    for (int i = startAt; i < pathComponents.length; i++) {
+      path.append(Path.SEPARATOR).append(
+          DFSUtil.bytes2String(pathComponents[i]));
     }
     if (NameNode.LOG.isDebugEnabled()) {
       NameNode.LOG.debug("Resolved path is " + path);
@@ -2893,7 +3124,7 @@ public class FSDirectory implements Clos
    * @throws UnresolvedLinkException if symlink can't be resolved
    * @throws SnapshotAccessControlException if path is in RO snapshot
    */
-  private INodesInPath getINodesInPath4Write(String src, boolean resolveLink)
+  INodesInPath getINodesInPath4Write(String src, boolean resolveLink)
           throws UnresolvedLinkException, SnapshotAccessControlException {
     final byte[][] components = INode.getPathComponents(src);
     INodesInPath inodesInPath = INodesInPath.resolve(rootDir, components,

Modified: 
hadoop/common/branches/HDFS-6584/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSEditLogLoader.java
URL: 
http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-6584/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSEditLogLoader.java?rev=1619293&r1=1619292&r2=1619293&view=diff
==============================================================================
--- 
hadoop/common/branches/HDFS-6584/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSEditLogLoader.java
 (original)
+++ 
hadoop/common/branches/HDFS-6584/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSEditLogLoader.java
 Thu Aug 21 05:22:10 2014
@@ -367,7 +367,8 @@ public class FSEditLogLoader {
         if (toAddRetryCache) {
           HdfsFileStatus stat = fsNamesys.dir.createFileStatus(
               HdfsFileStatus.EMPTY_NAME, newFile,
-              BlockStoragePolicy.ID_UNSPECIFIED, Snapshot.CURRENT_STATE_ID);
+              BlockStoragePolicy.ID_UNSPECIFIED, Snapshot.CURRENT_STATE_ID,
+              false);
           fsNamesys.addCacheEntryWithPayload(addCloseOp.rpcClientId,
               addCloseOp.rpcCallId, stat);
         }

Modified: 
hadoop/common/branches/HDFS-6584/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSNamesystem.java
URL: 
http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-6584/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSNamesystem.java?rev=1619293&r1=1619292&r2=1619293&view=diff
==============================================================================
--- 
hadoop/common/branches/HDFS-6584/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSNamesystem.java
 (original)
+++ 
hadoop/common/branches/HDFS-6584/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSNamesystem.java
 Thu Aug 21 05:22:10 2014
@@ -17,6 +17,9 @@
  */
 package org.apache.hadoop.hdfs.server.namenode;
 
+import static org.apache.hadoop.crypto.key.KeyProvider.KeyVersion;
+import static org.apache.hadoop.crypto.key.KeyProviderCryptoExtension
+    .EncryptedKeyVersion;
 import static 
org.apache.hadoop.fs.CommonConfigurationKeysPublic.FS_TRASH_INTERVAL_DEFAULT;
 import static 
org.apache.hadoop.fs.CommonConfigurationKeysPublic.FS_TRASH_INTERVAL_KEY;
 import static 
org.apache.hadoop.fs.CommonConfigurationKeysPublic.IO_FILE_BUFFER_SIZE_DEFAULT;
@@ -102,6 +105,8 @@ import java.io.StringWriter;
 import java.lang.management.ManagementFactory;
 import java.net.InetAddress;
 import java.net.URI;
+import java.security.GeneralSecurityException;
+import java.security.NoSuchAlgorithmException;
 import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.Collection;
@@ -115,6 +120,7 @@ import java.util.LinkedHashSet;
 import java.util.List;
 import java.util.Map;
 import java.util.Set;
+import java.util.UUID;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.locks.Condition;
 import java.util.concurrent.locks.ReentrantLock;
@@ -130,12 +136,17 @@ import org.apache.commons.logging.impl.L
 import org.apache.hadoop.HadoopIllegalArgumentException;
 import org.apache.hadoop.classification.InterfaceAudience;
 import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.crypto.CipherSuite;
+import org.apache.hadoop.crypto.CryptoCodec;
+import org.apache.hadoop.crypto.key.KeyProvider;
+import org.apache.hadoop.crypto.key.KeyProviderCryptoExtension;
 import org.apache.hadoop.fs.BatchedRemoteIterator.BatchedListEntries;
 import org.apache.hadoop.fs.CacheFlag;
 import org.apache.hadoop.fs.ContentSummary;
 import org.apache.hadoop.fs.CreateFlag;
 import org.apache.hadoop.fs.DirectoryListingStartAfterNotFoundException;
 import org.apache.hadoop.fs.FileAlreadyExistsException;
+import org.apache.hadoop.fs.FileEncryptionInfo;
 import org.apache.hadoop.fs.FileStatus;
 import org.apache.hadoop.fs.FsServerDefaults;
 import org.apache.hadoop.fs.InvalidPathException;
@@ -159,6 +170,7 @@ import org.apache.hadoop.hdfs.DFSConfigK
 import org.apache.hadoop.hdfs.DFSUtil;
 import org.apache.hadoop.hdfs.HAUtil;
 import org.apache.hadoop.hdfs.HdfsConfiguration;
+import org.apache.hadoop.hdfs.UnknownCipherSuiteException;
 import org.apache.hadoop.hdfs.protocol.AclException;
 import org.apache.hadoop.hdfs.protocol.AlreadyBeingCreatedException;
 import org.apache.hadoop.hdfs.protocol.Block;
@@ -170,6 +182,8 @@ import org.apache.hadoop.hdfs.protocol.C
 import org.apache.hadoop.hdfs.protocol.DatanodeID;
 import org.apache.hadoop.hdfs.protocol.DatanodeInfo;
 import org.apache.hadoop.hdfs.protocol.DirectoryListing;
+import org.apache.hadoop.hdfs.protocol.EncryptionZone;
+import org.apache.hadoop.hdfs.protocol.EncryptionZoneWithId;
 import org.apache.hadoop.hdfs.protocol.ExtendedBlock;
 import org.apache.hadoop.hdfs.protocol.HdfsConstants;
 import org.apache.hadoop.hdfs.protocol.HdfsConstants.DatanodeReportType;
@@ -325,7 +339,7 @@ public class FSNamesystem implements Nam
   private HdfsFileStatus getAuditFileInfo(String path, boolean resolveSymlink)
       throws IOException {
     return (isAuditEnabled() && isExternalInvocation())
-        ? dir.getFileInfo(path, resolveSymlink) : null;
+        ? dir.getFileInfo(path, resolveSymlink, false) : null;
   }
   
   private void logAuditEvent(boolean succeeded, String cmd, String src)
@@ -411,6 +425,8 @@ public class FSNamesystem implements Nam
   private final CacheManager cacheManager;
   private final DatanodeStatistics datanodeStatistics;
 
+  private String nameserviceId;
+
   private RollingUpgradeInfo rollingUpgradeInfo = null;
   /**
    * A flag that indicates whether the checkpointer should checkpoint a 
rollback
@@ -526,6 +542,11 @@ public class FSNamesystem implements Nam
 
   private final NNConf nnConf;
 
+  private KeyProviderCryptoExtension provider = null;
+  private KeyProvider.Options providerOptions = null;
+
+  private final CryptoCodec codec;
+
   private volatile boolean imageLoaded = false;
   private final Condition cond;
 
@@ -745,6 +766,14 @@ public class FSNamesystem implements Nam
    */
   FSNamesystem(Configuration conf, FSImage fsImage, boolean ignoreRetryCache)
       throws IOException {
+    provider = DFSUtil.createKeyProviderCryptoExtension(conf);
+    if (provider == null) {
+      LOG.info("No KeyProvider found.");
+    } else {
+      LOG.info("Found KeyProvider: " + provider.toString());
+    }
+    providerOptions = KeyProvider.options(conf);
+    this.codec = CryptoCodec.getInstance(conf);
     if (conf.getBoolean(DFS_NAMENODE_AUDIT_LOG_ASYNC_KEY,
                         DFS_NAMENODE_AUDIT_LOG_ASYNC_DEFAULT)) {
       LOG.info("Enabling async auditlog");
@@ -776,7 +805,7 @@ public class FSNamesystem implements Nam
 
       // block allocation has to be persisted in HA using a shared edits 
directory
       // so that the standby has up-to-date namespace information
-      String nameserviceId = DFSUtil.getNamenodeNameServiceId(conf);
+      nameserviceId = DFSUtil.getNamenodeNameServiceId(conf);
       this.haEnabled = HAUtil.isHAEnabled(conf, nameserviceId);  
       
       // Sanity check the HA-related config.
@@ -903,6 +932,11 @@ public class FSNamesystem implements Nam
   }
 
   @VisibleForTesting
+  public KeyProviderCryptoExtension getProvider() {
+    return provider;
+  }
+
+  @VisibleForTesting
   static RetryCache initRetryCache(Configuration conf) {
     boolean enable = conf.getBoolean(DFS_NAMENODE_ENABLE_RETRY_CACHE_KEY,
         DFS_NAMENODE_ENABLE_RETRY_CACHE_DEFAULT);
@@ -1630,9 +1664,10 @@ public class FSNamesystem implements Nam
     }
   }
 
-  private void setPermissionInt(String src, FsPermission permission)
+  private void setPermissionInt(final String srcArg, FsPermission permission)
       throws AccessControlException, FileNotFoundException, SafeModeException,
       UnresolvedLinkException, IOException {
+    String src = srcArg;
     HdfsFileStatus resultingStat = null;
     FSPermissionChecker pc = getPermissionChecker();
     checkOperation(OperationCategory.WRITE);
@@ -1641,7 +1676,7 @@ public class FSNamesystem implements Nam
     try {
       checkOperation(OperationCategory.WRITE);
       checkNameNodeSafeMode("Cannot set permission for " + src);
-      src = FSDirectory.resolvePath(src, pathComponents, dir);
+      src = resolvePath(src, pathComponents);
       checkOwner(pc, src);
       dir.setPermission(src, permission);
       getEditLog().logSetPermissions(src, permission);
@@ -1650,7 +1685,7 @@ public class FSNamesystem implements Nam
       writeUnlock();
     }
     getEditLog().logSync();
-    logAuditEvent(true, "setPermission", src, null, resultingStat);
+    logAuditEvent(true, "setPermission", srcArg, null, resultingStat);
   }
 
   /**
@@ -1668,9 +1703,10 @@ public class FSNamesystem implements Nam
     } 
   }
 
-  private void setOwnerInt(String src, String username, String group)
+  private void setOwnerInt(final String srcArg, String username, String group)
       throws AccessControlException, FileNotFoundException, SafeModeException,
       UnresolvedLinkException, IOException {
+    String src = srcArg;
     HdfsFileStatus resultingStat = null;
     FSPermissionChecker pc = getPermissionChecker();
     checkOperation(OperationCategory.WRITE);
@@ -1679,7 +1715,7 @@ public class FSNamesystem implements Nam
     try {
       checkOperation(OperationCategory.WRITE);
       checkNameNodeSafeMode("Cannot set owner for " + src);
-      src = FSDirectory.resolvePath(src, pathComponents, dir);
+      src = resolvePath(src, pathComponents);
       checkOwner(pc, src);
       if (!pc.isSuperUser()) {
         if (username != null && !pc.getUser().equals(username)) {
@@ -1696,7 +1732,7 @@ public class FSNamesystem implements Nam
       writeUnlock();
     }
     getEditLog().logSync();
-    logAuditEvent(true, "setOwner", src, null, resultingStat);
+    logAuditEvent(true, "setOwner", srcArg, null, resultingStat);
   }
 
   /**
@@ -1779,10 +1815,11 @@ public class FSNamesystem implements Nam
    * Get block locations within the specified range, updating the
    * access times if necessary. 
    */
-  private LocatedBlocks getBlockLocationsUpdateTimes(String src, long offset,
-      long length, boolean doAccessTime, boolean needBlockToken)
+  private LocatedBlocks getBlockLocationsUpdateTimes(final String srcArg,
+      long offset, long length, boolean doAccessTime, boolean needBlockToken)
       throws FileNotFoundException,
       UnresolvedLinkException, IOException {
+    String src = srcArg;
     FSPermissionChecker pc = getPermissionChecker();
     byte[][] pathComponents = 
FSDirectory.getPathComponentsForReservedPath(src);
     for (int attempt = 0; attempt < 2; attempt++) {
@@ -1794,7 +1831,7 @@ public class FSNamesystem implements Nam
         checkOperation(OperationCategory.WRITE);
         writeLock(); // writelock is needed to set accesstime
       }
-      src = FSDirectory.resolvePath(src, pathComponents, dir);
+      src = resolvePath(src, pathComponents);
       try {
         if (isReadOp) {
           checkOperation(OperationCategory.READ);
@@ -1838,9 +1875,14 @@ public class FSNamesystem implements Nam
           length = Math.min(length, fileSize - offset);
           isUc = false;
         }
-        LocatedBlocks blocks =
+
+        final FileEncryptionInfo feInfo =
+          FSDirectory.isReservedRawName(srcArg) ?
+          null : dir.getFileEncryptionInfo(inode, iip.getPathSnapshotId());
+
+        final LocatedBlocks blocks =
           blockManager.createLocatedBlocks(inode.getBlocks(), fileSize,
-            isUc, offset, length, needBlockToken, iip.isSnapshot());
+            isUc, offset, length, needBlockToken, iip.isSnapshot(), feInfo);
         // Set caching information for the located blocks.
         for (LocatedBlock lb: blocks.getLocatedBlocks()) {
           cacheManager.setCachedLocations(lb);
@@ -2061,8 +2103,9 @@ public class FSNamesystem implements Nam
     }
   }
 
-  private void setTimesInt(String src, long mtime, long atime) 
+  private void setTimesInt(final String srcArg, long mtime, long atime)
     throws IOException, UnresolvedLinkException {
+    String src = srcArg;
     HdfsFileStatus resultingStat = null;
     FSPermissionChecker pc = getPermissionChecker();
     checkOperation(OperationCategory.WRITE);
@@ -2071,7 +2114,7 @@ public class FSNamesystem implements Nam
     try {
       checkOperation(OperationCategory.WRITE);
       checkNameNodeSafeMode("Cannot set times " + src);
-      src = FSDirectory.resolvePath(src, pathComponents, dir);
+      src = resolvePath(src, pathComponents);
 
       // Write access is required to set access and modification times
       if (isPermissionEnabled) {
@@ -2092,7 +2135,7 @@ public class FSNamesystem implements Nam
     } finally {
       writeUnlock();
     }
-    logAuditEvent(true, "setTimes", src, null, resultingStat);
+    logAuditEvent(true, "setTimes", srcArg, null, resultingStat);
   }
 
   /**
@@ -2123,9 +2166,10 @@ public class FSNamesystem implements Nam
     }
   }
 
-  private void createSymlinkInt(String target, String link,
+  private void createSymlinkInt(String target, final String linkArg,
       PermissionStatus dirPerms, boolean createParent, boolean logRetryCache) 
       throws IOException, UnresolvedLinkException {
+    String link = linkArg;
     if (NameNode.stateChangeLog.isDebugEnabled()) {
       NameNode.stateChangeLog.debug("DIR* NameSystem.createSymlink: target="
           + target + " link=" + link);
@@ -2138,7 +2182,7 @@ public class FSNamesystem implements Nam
     try {
       checkOperation(OperationCategory.WRITE);
       checkNameNodeSafeMode("Cannot create symlink " + link);
-      link = FSDirectory.resolvePath(link, pathComponents, dir);
+      link = resolvePath(link, pathComponents);
       if (!createParent) {
         verifyParentDir(link);
       }
@@ -2159,7 +2203,7 @@ public class FSNamesystem implements Nam
       writeUnlock();
     }
     getEditLog().logSync();
-    logAuditEvent(true, "createSymlink", link, target, resultingStat);
+    logAuditEvent(true, "createSymlink", linkArg, target, resultingStat);
   }
 
   /**
@@ -2185,8 +2229,9 @@ public class FSNamesystem implements Nam
     }
   }
 
-  private boolean setReplicationInt(String src, final short replication)
-      throws IOException {
+  private boolean setReplicationInt(final String srcArg,
+      final short replication) throws IOException {
+    String src = srcArg;
     blockManager.verifyReplication(src, replication, null);
     final boolean isFile;
     FSPermissionChecker pc = getPermissionChecker();
@@ -2197,7 +2242,7 @@ public class FSNamesystem implements Nam
     try {
       checkOperation(OperationCategory.WRITE);
       checkNameNodeSafeMode("Cannot set replication for " + src);
-      src = FSDirectory.resolvePath(src, pathComponents, dir);
+      src = resolvePath(src, pathComponents);
       if (isPermissionEnabled) {
         checkPathAccess(pc, src, FsAction.WRITE);
       }
@@ -2215,7 +2260,7 @@ public class FSNamesystem implements Nam
 
     getEditLog().logSync();
     if (isFile) {
-      logAuditEvent(true, "setReplication", src);
+      logAuditEvent(true, "setReplication", srcArg);
     }
     return isFile;
   }
@@ -2274,7 +2319,7 @@ public class FSNamesystem implements Nam
     readLock();
     try {
       checkOperation(OperationCategory.READ);
-      filename = FSDirectory.resolvePath(filename, pathComponents, dir);
+      filename = resolvePath(filename, pathComponents);
       if (isPermissionEnabled) {
         checkTraverse(pc, filename);
       }
@@ -2302,7 +2347,74 @@ public class FSNamesystem implements Nam
       }
     }
   }
-  
+
+  /**
+   * If the file is within an encryption zone, select the appropriate 
+   * CipherSuite from the list provided by the client. Since the client may 
+   * be newer, need to handle unknown CipherSuites.
+   *
+   * @param srcIIP path of the file
+   * @param cipherSuites client-provided list of supported CipherSuites, 
+   *                     in desired order.
+   * @return chosen CipherSuite, or null if file is not in an EncryptionZone
+   * @throws IOException
+   */
+  private CipherSuite chooseCipherSuite(INodesInPath srcIIP, List<CipherSuite>
+      cipherSuites)
+      throws UnknownCipherSuiteException, UnresolvedLinkException,
+        SnapshotAccessControlException {
+    // Not in an EZ
+    if (!dir.isInAnEZ(srcIIP)) {
+      return null;
+    }
+    CipherSuite chosen = null;
+    for (CipherSuite c : cipherSuites) {
+      if (c.equals(CipherSuite.UNKNOWN)) {
+        if (LOG.isDebugEnabled()) {
+          LOG.debug("Ignoring unknown CipherSuite provided by client: "
+              + c.getUnknownValue());
+        }
+        continue;
+      }
+      for (CipherSuite supported : CipherSuite.values()) {
+        if (supported.equals(c)) {
+          chosen = c;
+          break;
+        }
+      }
+    }
+    if (chosen == null) {
+      throw new UnknownCipherSuiteException(
+          "No cipher suites provided by the client are supported."
+              + " Client provided: " + Arrays.toString(cipherSuites.toArray())
+              + " NameNode supports: " + 
Arrays.toString(CipherSuite.values()));
+    }
+    return chosen;
+  }
+
+  /**
+   * Invoke KeyProvider APIs to generate an encrypted data encryption key for 
an
+   * encryption zone. Should not be called with any locks held.
+   *
+   * @param ezKeyName key name of an encryption zone
+   * @return New EDEK, or null if ezKeyName is null
+   * @throws IOException
+   */
+  private EncryptedKeyVersion generateEncryptedDataEncryptionKey(String
+      ezKeyName) throws IOException {
+    if (ezKeyName == null) {
+      return null;
+    }
+    EncryptedKeyVersion edek = null;
+    try {
+      edek = provider.generateEncryptedKey(ezKeyName);
+    } catch (GeneralSecurityException e) {
+      throw new IOException(e);
+    }
+    Preconditions.checkNotNull(edek);
+    return edek;
+  }
+
   /**
    * Create a new file entry in the namespace.
    * 
@@ -2312,7 +2424,8 @@ public class FSNamesystem implements Nam
    */
   HdfsFileStatus startFile(String src, PermissionStatus permissions,
       String holder, String clientMachine, EnumSet<CreateFlag> flag,
-      boolean createParent, short replication, long blockSize)
+      boolean createParent, short replication, long blockSize, 
+      List<CipherSuite> cipherSuites)
       throws AccessControlException, SafeModeException,
       FileAlreadyExistsException, UnresolvedLinkException,
       FileNotFoundException, ParentNotDirectoryException, IOException {
@@ -2325,7 +2438,8 @@ public class FSNamesystem implements Nam
     
     try {
       status = startFileInt(src, permissions, holder, clientMachine, flag,
-          createParent, replication, blockSize, cacheEntry != null);
+          createParent, replication, blockSize, cipherSuites,
+          cacheEntry != null);
     } catch (AccessControlException e) {
       logAuditEvent(false, "create", src);
       throw e;
@@ -2335,19 +2449,30 @@ public class FSNamesystem implements Nam
     return status;
   }
 
-  private HdfsFileStatus startFileInt(String src, PermissionStatus permissions,
-      String holder, String clientMachine, EnumSet<CreateFlag> flag,
-      boolean createParent, short replication, long blockSize,
-      boolean logRetryCache) throws AccessControlException, SafeModeException,
+  private HdfsFileStatus startFileInt(final String srcArg,
+      PermissionStatus permissions, String holder, String clientMachine,
+      EnumSet<CreateFlag> flag, boolean createParent, short replication,
+      long blockSize, List<CipherSuite> cipherSuites, boolean logRetryCache)
+      throws AccessControlException, SafeModeException,
       FileAlreadyExistsException, UnresolvedLinkException,
       FileNotFoundException, ParentNotDirectoryException, IOException {
+    String src = srcArg;
     if (NameNode.stateChangeLog.isDebugEnabled()) {
-      NameNode.stateChangeLog.debug("DIR* NameSystem.startFile: src=" + src
-          + ", holder=" + holder
-          + ", clientMachine=" + clientMachine
-          + ", createParent=" + createParent
-          + ", replication=" + replication
-          + ", createFlag=" + flag.toString());
+      StringBuilder builder = new StringBuilder();
+      builder.append("DIR* NameSystem.startFile: src=" + src
+              + ", holder=" + holder
+              + ", clientMachine=" + clientMachine
+              + ", createParent=" + createParent
+              + ", replication=" + replication
+              + ", createFlag=" + flag.toString()
+              + ", blockSize=" + blockSize);
+      builder.append(", cipherSuites=");
+      if (cipherSuites != null) {
+        builder.append(Arrays.toString(cipherSuites.toArray()));
+      } else {
+        builder.append("null");
+      }
+      NameNode.stateChangeLog.debug(builder.toString());
     }
     if (!DFSUtil.isValidName(src)) {
       throw new InvalidPathException(src);
@@ -2368,27 +2493,92 @@ public class FSNamesystem implements Nam
     boolean overwrite = flag.contains(CreateFlag.OVERWRITE);
 
     waitForLoadingFSImage();
-    writeLock();
+
+    /*
+     * We want to avoid holding any locks while doing KeyProvider operations,
+     * since they can be very slow. Since the path can
+     * flip flop between being in an encryption zone and not in the meantime,
+     * we need to recheck the preconditions and redo KeyProvider operations
+     * in some situations.
+     *
+     * A special RetryStartFileException is used to indicate that we should
+     * retry creation of a FileEncryptionInfo.
+     */
     try {
-      checkOperation(OperationCategory.WRITE);
-      checkNameNodeSafeMode("Cannot create file" + src);
-      src = FSDirectory.resolvePath(src, pathComponents, dir);
-      startFileInternal(pc, src, permissions, holder, clientMachine, create,
-          overwrite, createParent, replication, blockSize, logRetryCache);
-      stat = dir.getFileInfo(src, false);
-    } catch (StandbyException se) {
-      skipSync = true;
-      throw se;
+      boolean shouldContinue = true;
+      int iters = 0;
+      while (shouldContinue) {
+        skipSync = false;
+        if (iters >= 10) {
+          throw new IOException("Too many retries because of encryption zone " 
+
+              "operations, something might be broken!");
+        }
+        shouldContinue = false;
+        iters++;
+
+        // Optimistically determine CipherSuite and ezKeyName if the path is
+        // currently within an encryption zone
+        CipherSuite suite = null;
+        String ezKeyName = null;
+        readLock();
+        try {
+          src = resolvePath(src, pathComponents);
+          INodesInPath iip = dir.getINodesInPath4Write(src);
+          // Nothing to do if the path is not within an EZ
+          if (dir.isInAnEZ(iip)) {
+            suite = chooseCipherSuite(iip, cipherSuites);
+            if (suite != null) {
+              Preconditions.checkArgument(!suite.equals(CipherSuite.UNKNOWN),
+                  "Chose an UNKNOWN CipherSuite!");
+            }
+            ezKeyName = dir.getKeyName(iip);
+            Preconditions.checkState(ezKeyName != null);
+          }
+        } finally {
+          readUnlock();
+        }
+
+        Preconditions.checkState(
+            (suite == null && ezKeyName == null) ||
+            (suite != null && ezKeyName != null),
+            "Both suite and ezKeyName should both be null or not null");
+        // Generate EDEK if necessary while not holding the lock
+        EncryptedKeyVersion edek =
+            generateEncryptedDataEncryptionKey(ezKeyName);
+        EncryptionFaultInjector.getInstance().startFileAfterGenerateKey();
+        // Try to create the file with the computed cipher suite and EDEK
+        writeLock();
+        try {
+          checkOperation(OperationCategory.WRITE);
+          checkNameNodeSafeMode("Cannot create file" + src);
+          src = resolvePath(src, pathComponents);
+          startFileInternal(pc, src, permissions, holder, clientMachine, 
create,
+              overwrite, createParent, replication, blockSize, suite, edek,
+              logRetryCache);
+          stat = dir.getFileInfo(src, false,
+              FSDirectory.isReservedRawName(srcArg));
+        } catch (StandbyException se) {
+          skipSync = true;
+          throw se;
+        } catch (RetryStartFileException e) {
+          shouldContinue = true;
+          if (LOG.isTraceEnabled()) {
+            LOG.trace("Preconditions failed, retrying creation of " +
+                    "FileEncryptionInfo", e);
+          }
+        } finally {
+          writeUnlock();
+        }
+      }
     } finally {
-      writeUnlock();
       // There might be transactions logged while trying to recover the lease.
       // They need to be sync'ed even when an exception was thrown.
       if (!skipSync) {
         getEditLog().logSync();
       }
-    } 
+    }
 
-    logAuditEvent(true, "create", src, null, stat);
+    logAuditEvent(true, "create", srcArg, null, stat);
     return stat;
   }
 
@@ -2404,10 +2594,11 @@ public class FSNamesystem implements Nam
   private void startFileInternal(FSPermissionChecker pc, String src,
       PermissionStatus permissions, String holder, String clientMachine,
       boolean create, boolean overwrite, boolean createParent,
-      short replication, long blockSize, boolean logRetryEntry)
+      short replication, long blockSize, CipherSuite suite,
+      EncryptedKeyVersion edek, boolean logRetryEntry)
       throws FileAlreadyExistsException, AccessControlException,
       UnresolvedLinkException, FileNotFoundException,
-      ParentNotDirectoryException, IOException {
+      ParentNotDirectoryException, RetryStartFileException, IOException {
     assert hasWriteLock();
     // Verify that the destination does not exist as a directory already.
     final INodesInPath iip = dir.getINodesInPath4Write(src);
@@ -2416,6 +2607,26 @@ public class FSNamesystem implements Nam
       throw new FileAlreadyExistsException(src +
           " already exists as a directory");
     }
+
+    FileEncryptionInfo feInfo = null;
+    if (dir.isInAnEZ(iip)) {
+      // The path is now within an EZ, but we're missing encryption parameters
+      if (suite == null || edek == null) {
+        throw new RetryStartFileException();
+      }
+      // Path is within an EZ and we have provided encryption parameters.
+      // Make sure that the generated EDEK matches the settings of the EZ.
+      String ezKeyName = dir.getKeyName(iip);
+      if (!ezKeyName.equals(edek.getEncryptionKeyName())) {
+        throw new RetryStartFileException();
+      }
+      feInfo = new FileEncryptionInfo(suite,
+          edek.getEncryptedKeyVersion().getMaterial(),
+          edek.getEncryptedKeyIv(),
+          edek.getEncryptionKeyVersionName());
+      Preconditions.checkNotNull(feInfo);
+    }
+
     final INodeFile myFile = INodeFile.valueOf(inode, src, true);
     if (isPermissionEnabled) {
       if (overwrite && myFile != null) {
@@ -2468,6 +2679,12 @@ public class FSNamesystem implements Nam
       leaseManager.addLease(newNode.getFileUnderConstructionFeature()
           .getClientName(), src);
 
+      // Set encryption attributes if necessary
+      if (feInfo != null) {
+        dir.setFileEncryptionInfo(src, feInfo);
+        newNode = dir.getInode(newNode.getId()).asFile();
+      }
+
       // record file record in log, record new generation stamp
       getEditLog().logOpenFile(src, newNode, logRetryEntry);
       if (NameNode.stateChangeLog.isDebugEnabled()) {
@@ -2480,7 +2697,7 @@ public class FSNamesystem implements Nam
       throw ie;
     }
   }
-  
+
   /**
    * Append to an existing file for append.
    * <p>
@@ -2606,7 +2823,7 @@ public class FSNamesystem implements Nam
     try {
       checkOperation(OperationCategory.WRITE);
       checkNameNodeSafeMode("Cannot recover the lease of " + src);
-      src = FSDirectory.resolvePath(src, pathComponents, dir);
+      src = resolvePath(src, pathComponents);
       final INodeFile inode = INodeFile.valueOf(dir.getINode(src), src);
       if (!inode.isUnderConstruction()) {
         return true;
@@ -2733,11 +2950,12 @@ public class FSNamesystem implements Nam
     }
   }
 
-  private LocatedBlock appendFileInt(String src, String holder,
+  private LocatedBlock appendFileInt(final String srcArg, String holder,
       String clientMachine, boolean logRetryCache)
       throws AccessControlException, SafeModeException,
       FileAlreadyExistsException, FileNotFoundException,
       ParentNotDirectoryException, IOException {
+    String src = srcArg;
     if (NameNode.stateChangeLog.isDebugEnabled()) {
       NameNode.stateChangeLog.debug("DIR* NameSystem.appendFile: src=" + src
           + ", holder=" + holder
@@ -2752,7 +2970,7 @@ public class FSNamesystem implements Nam
     try {
       checkOperation(OperationCategory.WRITE);
       checkNameNodeSafeMode("Cannot append to file" + src);
-      src = FSDirectory.resolvePath(src, pathComponents, dir);
+      src = resolvePath(src, pathComponents);
       lb = appendFileInternal(pc, src, holder, clientMachine, logRetryCache);
     } catch (StandbyException se) {
       skipSync = true;
@@ -2773,7 +2991,7 @@ public class FSNamesystem implements Nam
             +" block size " + lb.getBlock().getNumBytes());
       }
     }
-    logAuditEvent(true, "append", src);
+    logAuditEvent(true, "append", srcArg);
     return lb;
   }
 
@@ -2819,7 +3037,7 @@ public class FSNamesystem implements Nam
     readLock();
     try {
       checkOperation(OperationCategory.READ);
-      src = FSDirectory.resolvePath(src, pathComponents, dir);
+      src = resolvePath(src, pathComponents);
       LocatedBlock[] onRetryBlock = new LocatedBlock[1];
       FileState fileState = analyzeFileState(
           src, fileId, clientName, previous, onRetryBlock);
@@ -3045,7 +3263,7 @@ public class FSNamesystem implements Nam
       checkOperation(OperationCategory.READ);
       //check safe mode
       checkNameNodeSafeMode("Cannot add datanode; src=" + src + ", blk=" + 
blk);
-      src = FSDirectory.resolvePath(src, pathComponents, dir);
+      src = resolvePath(src, pathComponents);
 
       //check lease
       final INode inode;
@@ -3098,7 +3316,7 @@ public class FSNamesystem implements Nam
     try {
       checkOperation(OperationCategory.WRITE);
       checkNameNodeSafeMode("Cannot abandon block " + b + " for file" + src);
-      src = FSDirectory.resolvePath(src, pathComponents, dir);
+      src = resolvePath(src, pathComponents);
 
       final INode inode;
       if (fileId == INodeId.GRANDFATHER_INODE_ID) {
@@ -3180,9 +3398,10 @@ public class FSNamesystem implements Nam
    *         (e.g if not all blocks have reached minimum replication yet)
    * @throws IOException on error (eg lease mismatch, file not open, file 
deleted)
    */
-  boolean completeFile(String src, String holder,
+  boolean completeFile(final String srcArg, String holder,
                        ExtendedBlock last, long fileId)
     throws SafeModeException, UnresolvedLinkException, IOException {
+    String src = srcArg;
     if (NameNode.stateChangeLog.isDebugEnabled()) {
       NameNode.stateChangeLog.debug("DIR* NameSystem.completeFile: " +
           src + " for " + holder);
@@ -3196,7 +3415,7 @@ public class FSNamesystem implements Nam
     try {
       checkOperation(OperationCategory.WRITE);
       checkNameNodeSafeMode("Cannot complete file " + src);
-      src = FSDirectory.resolvePath(src, pathComponents, dir);
+      src = resolvePath(src, pathComponents);
       success = completeFileInternal(src, holder,
         ExtendedBlock.getLocalBlock(last), fileId);
     } finally {
@@ -3204,7 +3423,7 @@ public class FSNamesystem implements Nam
     }
     getEditLog().logSync();
     if (success) {
-      NameNode.stateChangeLog.info("DIR* completeFile: " + src
+      NameNode.stateChangeLog.info("DIR* completeFile: " + srcArg
           + " is closed by " + holder);
     }
     return success;
@@ -3372,8 +3591,11 @@ public class FSNamesystem implements Nam
     return ret;
   }
 
-  private boolean renameToInt(String src, String dst, boolean logRetryCache) 
+  private boolean renameToInt(final String srcArg, final String dstArg,
+    boolean logRetryCache)
     throws IOException, UnresolvedLinkException {
+    String src = srcArg;
+    String dst = dstArg;
     if (NameNode.stateChangeLog.isDebugEnabled()) {
       NameNode.stateChangeLog.debug("DIR* NameSystem.renameTo: " + src +
           " to " + dst);
@@ -3392,8 +3614,8 @@ public class FSNamesystem implements Nam
       checkOperation(OperationCategory.WRITE);
       checkNameNodeSafeMode("Cannot rename " + src);
       waitForLoadingFSImage();
-      src = FSDirectory.resolvePath(src, srcComponents, dir);
-      dst = FSDirectory.resolvePath(dst, dstComponents, dir);
+      src = resolvePath(src, srcComponents);
+      dst = resolvePath(dst, dstComponents);
       checkOperation(OperationCategory.WRITE);
       status = renameToInternal(pc, src, dst, logRetryCache);
       if (status) {
@@ -3404,7 +3626,7 @@ public class FSNamesystem implements Nam
     }
     getEditLog().logSync();
     if (status) {
-      logAuditEvent(true, "rename", src, dst, resultingStat);
+      logAuditEvent(true, "rename", srcArg, dstArg, resultingStat);
     }
     return status;
   }
@@ -3442,8 +3664,10 @@ public class FSNamesystem implements Nam
   
 
   /** Rename src to dst */
-  void renameTo(String src, String dst, Options.Rename... options)
-      throws IOException, UnresolvedLinkException {
+  void renameTo(final String srcArg, final String dstArg,
+      Options.Rename... options) throws IOException, UnresolvedLinkException {
+    String src = srcArg;
+    String dst = dstArg;
     if (NameNode.stateChangeLog.isDebugEnabled()) {
       NameNode.stateChangeLog.debug("DIR* NameSystem.renameTo: with options - "
           + src + " to " + dst);
@@ -3466,8 +3690,8 @@ public class FSNamesystem implements Nam
     try {
       checkOperation(OperationCategory.WRITE);
       checkNameNodeSafeMode("Cannot rename " + src);
-      src = FSDirectory.resolvePath(src, srcComponents, dir);
-      dst = FSDirectory.resolvePath(dst, dstComponents, dir);
+      src = resolvePath(src, srcComponents);
+      dst = resolvePath(dst, dstComponents);
       renameToInternal(pc, src, dst, cacheEntry != null, options);
       resultingStat = getAuditFileInfo(dst, false);
       success = true;
@@ -3481,7 +3705,7 @@ public class FSNamesystem implements Nam
       for (Rename option : options) {
         cmd.append(option.value()).append(" ");
       }
-      logAuditEvent(true, cmd.toString(), src, dst, resultingStat);
+      logAuditEvent(true, cmd.toString(), srcArg, dstArg, resultingStat);
     }
   }
 
@@ -3579,7 +3803,7 @@ public class FSNamesystem implements Nam
     try {
       checkOperation(OperationCategory.WRITE);
       checkNameNodeSafeMode("Cannot delete " + src);
-      src = FSDirectory.resolvePath(src, pathComponents, dir);
+      src = resolvePath(src, pathComponents);
       if (!recursive && dir.isNonEmptyDirectory(src)) {
         throw new PathIsNotEmptyDirectoryException(src + " is non empty");
       }
@@ -3587,6 +3811,7 @@ public class FSNamesystem implements Nam
         checkPermission(pc, src, false, null, FsAction.WRITE, null,
             FsAction.ALL, true, false);
       }
+
       long mtime = now();
       // Unlink the target directory from directory tree
       long filesRemoved = dir.delete(src, collectedBlocks, removedINodes,
@@ -3722,7 +3947,7 @@ public class FSNamesystem implements Nam
   /**
    * Get the file info for a specific file.
    *
-   * @param src The string representation of the path to the file
+   * @param srcArg The string representation of the path to the file
    * @param resolveLink whether to throw UnresolvedLinkException 
    *        if src refers to a symlink
    *
@@ -3733,9 +3958,10 @@ public class FSNamesystem implements Nam
    *         or null if file not found
    * @throws StandbyException 
    */
-  HdfsFileStatus getFileInfo(String src, boolean resolveLink) 
+  HdfsFileStatus getFileInfo(final String srcArg, boolean resolveLink)
     throws AccessControlException, UnresolvedLinkException,
            StandbyException, IOException {
+    String src = srcArg;
     if (!DFSUtil.isValidName(src)) {
       throw new InvalidPathException("Invalid file name: " + src);
     }
@@ -3746,34 +3972,36 @@ public class FSNamesystem implements Nam
     readLock();
     try {
       checkOperation(OperationCategory.READ);
-      src = FSDirectory.resolvePath(src, pathComponents, dir);
+      src = resolvePath(src, pathComponents);
       if (isPermissionEnabled) {
         checkPermission(pc, src, false, null, null, null, null, false,
             resolveLink);
       }
-      stat = dir.getFileInfo(src, resolveLink);
+      stat = dir.getFileInfo(src, resolveLink,
+          FSDirectory.isReservedRawName(srcArg));
     } catch (AccessControlException e) {
-      logAuditEvent(false, "getfileinfo", src);
+      logAuditEvent(false, "getfileinfo", srcArg);
       throw e;
     } finally {
       readUnlock();
     }
-    logAuditEvent(true, "getfileinfo", src);
+    logAuditEvent(true, "getfileinfo", srcArg);
     return stat;
   }
   
   /**
    * Returns true if the file is closed
    */
-  boolean isFileClosed(String src) 
+  boolean isFileClosed(final String srcArg)
       throws AccessControlException, UnresolvedLinkException,
       StandbyException, IOException {
+    String src = srcArg;
     FSPermissionChecker pc = getPermissionChecker();  
     checkOperation(OperationCategory.READ);
     byte[][] pathComponents = 
FSDirectory.getPathComponentsForReservedPath(src);
     readLock();
     try {
-      src = FSDirectory.resolvePath(src, pathComponents, dir);
+      src = resolvePath(src, pathComponents);
       checkOperation(OperationCategory.READ);
       if (isPermissionEnabled) {
         checkTraverse(pc, src);
@@ -3781,7 +4009,7 @@ public class FSNamesystem implements Nam
       return !INodeFile.valueOf(dir.getINode(src), src).isUnderConstruction();
     } catch (AccessControlException e) {
       if (isAuditEnabled() && isExternalInvocation()) {
-        logAuditEvent(false, "isFileClosed", src);
+        logAuditEvent(false, "isFileClosed", srcArg);
       }
       throw e;
     } finally {
@@ -3804,8 +4032,9 @@ public class FSNamesystem implements Nam
     return ret;
   }
 
-  private boolean mkdirsInt(String src, PermissionStatus permissions,
+  private boolean mkdirsInt(final String srcArg, PermissionStatus permissions,
       boolean createParent) throws IOException, UnresolvedLinkException {
+    String src = srcArg;
     if(NameNode.stateChangeLog.isDebugEnabled()) {
       NameNode.stateChangeLog.debug("DIR* NameSystem.mkdirs: " + src);
     }
@@ -3821,7 +4050,7 @@ public class FSNamesystem implements Nam
     try {
       checkOperation(OperationCategory.WRITE);   
       checkNameNodeSafeMode("Cannot create directory " + src);
-      src = FSDirectory.resolvePath(src, pathComponents, dir);
+      src = resolvePath(src, pathComponents);
       status = mkdirsInternal(pc, src, permissions, createParent);
       if (status) {
         resultingStat = getAuditFileInfo(src, false);
@@ -3831,7 +4060,7 @@ public class FSNamesystem implements Nam
     }
     getEditLog().logSync();
     if (status) {
-      logAuditEvent(true, "mkdirs", src, null, resultingStat);
+      logAuditEvent(true, "mkdirs", srcArg, null, resultingStat);
     }
     return status;
   }
@@ -3989,7 +4218,8 @@ public class FSNamesystem implements Nam
    * @return object containing information regarding the file
    *         or null if file not found
    */
-  ContentSummary getContentSummary(String src) throws IOException {
+  ContentSummary getContentSummary(final String srcArg) throws IOException {
+    String src = srcArg;
     FSPermissionChecker pc = getPermissionChecker();
     checkOperation(OperationCategory.READ);
     byte[][] pathComponents = 
FSDirectory.getPathComponentsForReservedPath(src);
@@ -3997,7 +4227,7 @@ public class FSNamesystem implements Nam
     boolean success = true;
     try {
       checkOperation(OperationCategory.READ);
-      src = FSDirectory.resolvePath(src, pathComponents, dir);
+      src = resolvePath(src, pathComponents);
       if (isPermissionEnabled) {
         checkPermission(pc, src, false, null, null, null, 
FsAction.READ_EXECUTE);
       }
@@ -4008,7 +4238,7 @@ public class FSNamesystem implements Nam
       throw ace;
     } finally {
       readUnlock();
-      logAuditEvent(success, "contentSummary", src);
+      logAuditEvent(success, "contentSummary", srcArg);
     }
   }
 
@@ -4059,7 +4289,7 @@ public class FSNamesystem implements Nam
     try {
       checkOperation(OperationCategory.WRITE);
       checkNameNodeSafeMode("Cannot fsync file " + src);
-      src = FSDirectory.resolvePath(src, pathComponents, dir);
+      src = resolvePath(src, pathComponents);
       final INode inode;
       if (fileId == INodeId.GRANDFATHER_INODE_ID) {
         // Older clients may not have given us an inode ID to work with.
@@ -4525,9 +4755,10 @@ public class FSNamesystem implements Nam
     }
   }
 
-  private DirectoryListing getListingInt(String src, byte[] startAfter,
-      boolean needLocation) 
+  private DirectoryListing getListingInt(final String srcArg, byte[] 
startAfter,
+      boolean needLocation)
     throws AccessControlException, UnresolvedLinkException, IOException {
+    String src = srcArg;
     DirectoryListing dl;
     FSPermissionChecker pc = getPermissionChecker();
     checkOperation(OperationCategory.READ);
@@ -4536,7 +4767,7 @@ public class FSNamesystem implements Nam
     readLock();
     try {
       checkOperation(OperationCategory.READ);
-      src = FSDirectory.resolvePath(src, pathComponents, dir);
+      src = resolvePath(src, pathComponents);
 
       // Get file name when startAfter is an INodePath
       if (FSDirectory.isReservedName(startAfterString)) {
@@ -4562,7 +4793,7 @@ public class FSNamesystem implements Nam
         }
         isSuperUser = pc.isSuperUser();
       }
-      logAuditEvent(true, "listStatus", src);
+      logAuditEvent(true, "listStatus", srcArg);
       dl = dir.getListing(src, startAfter, needLocation, isSuperUser);
     } finally {
       readUnlock();
@@ -5977,6 +6208,28 @@ public class FSNamesystem implements Nam
     checkPermission(pc, path, false, null, null, null, null);
   }
 
+  /**
+   * This is a wrapper for FSDirectory.resolvePath(). If the path passed
+   * is prefixed with /.reserved/raw, then it checks to ensure that the caller
+   * has super user privs.
+   *
+   * @param path The path to resolve.
+   * @param pathComponents path components corresponding to the path
+   * @return if the path indicates an inode, return path after replacing up to
+   *         <inodeid> with the corresponding path of the inode, else the path
+   *         in {@code src} as is. If the path refers to a path in the "raw"
+   *         directory, return the non-raw pathname.
+   * @throws FileNotFoundException
+   * @throws AccessControlException
+   */
+  private String resolvePath(String path, byte[][] pathComponents)
+      throws FileNotFoundException, AccessControlException {
+    if (FSDirectory.isReservedRawName(path)) {
+      checkSuperuserPrivilege();
+    }
+    return FSDirectory.resolvePath(path, pathComponents, dir);
+  }
+
   @Override
   public void checkSuperuserPrivilege()
       throws AccessControlException {
@@ -8198,7 +8451,9 @@ public class FSNamesystem implements Nam
     return results;
   }
 
-  void modifyAclEntries(String src, List<AclEntry> aclSpec) throws IOException 
{
+  void modifyAclEntries(final String srcArg, List<AclEntry> aclSpec)
+      throws IOException {
+    String src = srcArg;
     nnConf.checkAclsConfigFlag();
     HdfsFileStatus resultingStat = null;
     FSPermissionChecker pc = getPermissionChecker();
@@ -8208,7 +8463,7 @@ public class FSNamesystem implements Nam
     try {
       checkOperation(OperationCategory.WRITE);
       checkNameNodeSafeMode("Cannot modify ACL entries on " + src);
-      src = FSDirectory.resolvePath(src, pathComponents, dir);
+      src = resolvePath(src, pathComponents);
       checkOwner(pc, src);
       List<AclEntry> newAcl = dir.modifyAclEntries(src, aclSpec);
       getEditLog().logSetAcl(src, newAcl);
@@ -8217,10 +8472,12 @@ public class FSNamesystem implements Nam
       writeUnlock();
     }
     getEditLog().logSync();
-    logAuditEvent(true, "modifyAclEntries", src, null, resultingStat);
+    logAuditEvent(true, "modifyAclEntries", srcArg, null, resultingStat);
   }
 
-  void removeAclEntries(String src, List<AclEntry> aclSpec) throws IOException 
{
+  void removeAclEntries(final String srcArg, List<AclEntry> aclSpec)
+      throws IOException {
+    String src = srcArg;
     nnConf.checkAclsConfigFlag();
     HdfsFileStatus resultingStat = null;
     FSPermissionChecker pc = getPermissionChecker();
@@ -8230,7 +8487,7 @@ public class FSNamesystem implements Nam
     try {
       checkOperation(OperationCategory.WRITE);
       checkNameNodeSafeMode("Cannot remove ACL entries on " + src);
-      src = FSDirectory.resolvePath(src, pathComponents, dir);
+      src = resolvePath(src, pathComponents);
       checkOwner(pc, src);
       List<AclEntry> newAcl = dir.removeAclEntries(src, aclSpec);
       getEditLog().logSetAcl(src, newAcl);
@@ -8239,10 +8496,11 @@ public class FSNamesystem implements Nam
       writeUnlock();
     }
     getEditLog().logSync();
-    logAuditEvent(true, "removeAclEntries", src, null, resultingStat);
+    logAuditEvent(true, "removeAclEntries", srcArg, null, resultingStat);
   }
 
-  void removeDefaultAcl(String src) throws IOException {
+  void removeDefaultAcl(final String srcArg) throws IOException {
+    String src = srcArg;
     nnConf.checkAclsConfigFlag();
     HdfsFileStatus resultingStat = null;
     FSPermissionChecker pc = getPermissionChecker();
@@ -8252,7 +8510,7 @@ public class FSNamesystem implements Nam
     try {
       checkOperation(OperationCategory.WRITE);
       checkNameNodeSafeMode("Cannot remove default ACL entries on " + src);
-      src = FSDirectory.resolvePath(src, pathComponents, dir);
+      src = resolvePath(src, pathComponents);
       checkOwner(pc, src);
       List<AclEntry> newAcl = dir.removeDefaultAcl(src);
       getEditLog().logSetAcl(src, newAcl);
@@ -8261,10 +8519,11 @@ public class FSNamesystem implements Nam
       writeUnlock();
     }
     getEditLog().logSync();
-    logAuditEvent(true, "removeDefaultAcl", src, null, resultingStat);
+    logAuditEvent(true, "removeDefaultAcl", srcArg, null, resultingStat);
   }
 
-  void removeAcl(String src) throws IOException {
+  void removeAcl(final String srcArg) throws IOException {
+    String src = srcArg;
     nnConf.checkAclsConfigFlag();
     HdfsFileStatus resultingStat = null;
     FSPermissionChecker pc = getPermissionChecker();
@@ -8274,7 +8533,7 @@ public class FSNamesystem implements Nam
     try {
       checkOperation(OperationCategory.WRITE);
       checkNameNodeSafeMode("Cannot remove ACL on " + src);
-      src = FSDirectory.resolvePath(src, pathComponents, dir);
+      src = resolvePath(src, pathComponents);
       checkOwner(pc, src);
       dir.removeAcl(src);
       getEditLog().logSetAcl(src, AclFeature.EMPTY_ENTRY_LIST);
@@ -8283,10 +8542,11 @@ public class FSNamesystem implements Nam
       writeUnlock();
     }
     getEditLog().logSync();
-    logAuditEvent(true, "removeAcl", src, null, resultingStat);
+    logAuditEvent(true, "removeAcl", srcArg, null, resultingStat);
   }
 
-  void setAcl(String src, List<AclEntry> aclSpec) throws IOException {
+  void setAcl(final String srcArg, List<AclEntry> aclSpec) throws IOException {
+    String src = srcArg;
     nnConf.checkAclsConfigFlag();
     HdfsFileStatus resultingStat = null;
     FSPermissionChecker pc = getPermissionChecker();
@@ -8296,7 +8556,7 @@ public class FSNamesystem implements Nam
     try {
       checkOperation(OperationCategory.WRITE);
       checkNameNodeSafeMode("Cannot set ACL on " + src);
-      src = FSDirectory.resolvePath(src, pathComponents, dir);
+      src = resolvePath(src, pathComponents);
       checkOwner(pc, src);
       List<AclEntry> newAcl = dir.setAcl(src, aclSpec);
       getEditLog().logSetAcl(src, newAcl);
@@ -8305,7 +8565,7 @@ public class FSNamesystem implements Nam
       writeUnlock();
     }
     getEditLog().logSync();
-    logAuditEvent(true, "setAcl", src, null, resultingStat);
+    logAuditEvent(true, "setAcl", srcArg, null, resultingStat);
   }
 
   AclStatus getAclStatus(String src) throws IOException {
@@ -8316,7 +8576,7 @@ public class FSNamesystem implements Nam
     readLock();
     try {
       checkOperation(OperationCategory.READ);
-      src = FSDirectory.resolvePath(src, pathComponents, dir);
+      src = resolvePath(src, pathComponents);
       if (isPermissionEnabled) {
         checkPermission(pc, src, false, null, null, null, null);
       }
@@ -8325,7 +8585,140 @@ public class FSNamesystem implements Nam
       readUnlock();
     }
   }
-  
+
+  /**
+   * Create an encryption zone on directory src using the specified key.
+   *
+   * @param src     the path of a directory which will be the root of the
+   *                encryption zone. The directory must be empty.
+   * @param keyName name of a key which must be present in the configured
+   *                KeyProvider.
+   * @throws AccessControlException  if the caller is not the superuser.
+   * @throws UnresolvedLinkException if the path can't be resolved.
+   * @throws SafeModeException       if the Namenode is in safe mode.
+   */
+  void createEncryptionZone(final String src, final String keyName)
+    throws IOException, UnresolvedLinkException,
+      SafeModeException, AccessControlException {
+    final CacheEntry cacheEntry = RetryCache.waitForCompletion(retryCache);
+    if (cacheEntry != null && cacheEntry.isSuccess()) {
+      return; // Return previous response
+    }
+
+    boolean success = false;
+    try {
+      if (provider == null) {
+        throw new IOException(
+            "Can't create an encryption zone for " + src +
+            " since no key provider is available.");
+      }
+      if (keyName == null || keyName.isEmpty()) {
+        throw new IOException("Must specify a key name when creating an " +
+            "encryption zone");
+      }
+      KeyVersion keyVersion = provider.getCurrentKey(keyName);
+      if (keyVersion == null) {
+        /*
+         * It would be nice if we threw something more specific than
+         * IOException when the key is not found, but the KeyProvider API
+         * doesn't provide for that. If that API is ever changed to throw
+         * something more specific (e.g. UnknownKeyException) then we can
+         * update this to match it, or better yet, just rethrow the
+         * KeyProvider's exception.
+         */
+        throw new IOException("Key " + keyName + " doesn't exist.");
+      }
+      createEncryptionZoneInt(src, keyName, cacheEntry != null);
+      success = true;
+    } catch (AccessControlException e) {
+      logAuditEvent(false, "createEncryptionZone", src);
+      throw e;
+    } finally {
+      RetryCache.setState(cacheEntry, success);
+    }
+  }
+
+  private void createEncryptionZoneInt(final String srcArg, String keyName,
+      final boolean logRetryCache) throws IOException {
+    String src = srcArg;
+    HdfsFileStatus resultingStat = null;
+    checkSuperuserPrivilege();
+    checkOperation(OperationCategory.WRITE);
+    final byte[][] pathComponents =
+      FSDirectory.getPathComponentsForReservedPath(src);
+    writeLock();
+    try {
+      checkSuperuserPrivilege();
+      checkOperation(OperationCategory.WRITE);
+      checkNameNodeSafeMode("Cannot create encryption zone on " + src);
+      src = resolvePath(src, pathComponents);
+
+      final XAttr ezXAttr = dir.createEncryptionZone(src, keyName);
+      List<XAttr> xAttrs = Lists.newArrayListWithCapacity(1);
+      xAttrs.add(ezXAttr);
+      getEditLog().logSetXAttrs(src, xAttrs, logRetryCache);
+      resultingStat = getAuditFileInfo(src, false);
+    } finally {
+      writeUnlock();
+    }
+    getEditLog().logSync();
+    logAuditEvent(true, "createEncryptionZone", srcArg, null, resultingStat);
+  }
+
+  /**
+   * Get the encryption zone for the specified path.
+   *
+   * @param srcArg the path of a file or directory to get the EZ for.
+   * @return the EZ of the of the path or null if none.
+   * @throws AccessControlException  if the caller is not the superuser.
+   * @throws UnresolvedLinkException if the path can't be resolved.
+   */
+  EncryptionZoneWithId getEZForPath(final String srcArg)
+    throws AccessControlException, UnresolvedLinkException, IOException {
+    String src = srcArg;
+    HdfsFileStatus resultingStat = null;
+    final byte[][] pathComponents =
+        FSDirectory.getPathComponentsForReservedPath(src);
+    boolean success = false;
+    final FSPermissionChecker pc = getPermissionChecker();
+    checkOperation(OperationCategory.READ);
+    readLock();
+    try {
+      if (isPermissionEnabled) {
+        checkPathAccess(pc, src, FsAction.READ);
+      }
+      checkOperation(OperationCategory.READ);
+      src = resolvePath(src, pathComponents);
+      final INodesInPath iip = dir.getINodesInPath(src, true);
+      final EncryptionZoneWithId ret = dir.getEZForPath(iip);
+      resultingStat = getAuditFileInfo(src, false);
+      success = true;
+      return ret;
+    } finally {
+      readUnlock();
+      logAuditEvent(success, "getEZForPath", srcArg, null, resultingStat);
+    }
+  }
+
+  BatchedListEntries<EncryptionZoneWithId> listEncryptionZones(long prevId)
+      throws IOException {
+    boolean success = false;
+    checkSuperuserPrivilege();
+    checkOperation(OperationCategory.READ);
+    readLock();
+    try {
+      checkSuperuserPrivilege();
+      checkOperation(OperationCategory.READ);
+      final BatchedListEntries<EncryptionZoneWithId> ret =
+          dir.listEncryptionZones(prevId);
+      success = true;
+      return ret;
+    } finally {
+      readUnlock();
+      logAuditEvent(success, "listEncryptionZones", null);
+    }
+  }
+
   /**
    * Set xattr for a file or directory.
    * 
@@ -8359,20 +8752,22 @@ public class FSNamesystem implements Nam
     }
   }
   
-  private void setXAttrInt(String src, XAttr xAttr, EnumSet<XAttrSetFlag> flag,
-      boolean logRetryCache) throws IOException {
+  private void setXAttrInt(final String srcArg, XAttr xAttr,
+      EnumSet<XAttrSetFlag> flag, boolean logRetryCache) throws IOException {
+    String src = srcArg;
     nnConf.checkXAttrsConfigFlag();
     checkXAttrSize(xAttr);
     HdfsFileStatus resultingStat = null;
     FSPermissionChecker pc = getPermissionChecker();
-    XAttrPermissionFilter.checkPermissionForApi(pc, xAttr);
+    XAttrPermissionFilter.checkPermissionForApi(pc, xAttr,
+        FSDirectory.isReservedRawName(src));
     checkOperation(OperationCategory.WRITE);
     byte[][] pathComponents = 
FSDirectory.getPathComponentsForReservedPath(src);
     writeLock();
     try {
       checkOperation(OperationCategory.WRITE);
       checkNameNodeSafeMode("Cannot set XAttr on " + src);
-      src = FSDirectory.resolvePath(src, pathComponents, dir);
+      src = resolvePath(src, pathComponents);
       checkXAttrChangeAccess(src, xAttr, pc);
       List<XAttr> xAttrs = Lists.newArrayListWithCapacity(1);
       xAttrs.add(xAttr);
@@ -8383,7 +8778,7 @@ public class FSNamesystem implements Nam
       writeUnlock();
     }
     getEditLog().logSync();
-    logAuditEvent(true, "setXAttr", src, null, resultingStat);
+    logAuditEvent(true, "setXAttr", srcArg, null, resultingStat);
   }
 
   /**
@@ -8406,15 +8801,18 @@ public class FSNamesystem implements Nam
     }
   }
   
-  List<XAttr> getXAttrs(String src, List<XAttr> xAttrs) throws IOException {
+  List<XAttr> getXAttrs(final String srcArg, List<XAttr> xAttrs)
+      throws IOException {
+    String src = srcArg;
     nnConf.checkXAttrsConfigFlag();
     FSPermissionChecker pc = getPermissionChecker();
+    final boolean isRawPath = FSDirectory.isReservedRawName(src);
     boolean getAll = xAttrs == null || xAttrs.isEmpty();
     if (!getAll) {
       try {
-        XAttrPermissionFilter.checkPermissionForApi(pc, xAttrs);
+        XAttrPermissionFilter.checkPermissionForApi(pc, xAttrs, isRawPath);
       } catch (AccessControlException e) {
-        logAuditEvent(false, "getXAttrs", src);
+        logAuditEvent(false, "getXAttrs", srcArg);
         throw e;
       }
     }
@@ -8422,14 +8820,14 @@ public class FSNamesystem implements Nam
     byte[][] pathComponents = 
FSDirectory.getPathComponentsForReservedPath(src);
     readLock();
     try {
-      src = FSDirectory.resolvePath(src, pathComponents, dir);
+      src = resolvePath(src, pathComponents);
       checkOperation(OperationCategory.READ);
       if (isPermissionEnabled) {
         checkPathAccess(pc, src, FsAction.READ);
       }
       List<XAttr> all = dir.getXAttrs(src);
       List<XAttr> filteredAll = XAttrPermissionFilter.
-          filterXAttrsForApi(pc, all);
+          filterXAttrsForApi(pc, all, isRawPath);
       if (getAll) {
         return filteredAll;
       } else {
@@ -8455,7 +8853,7 @@ public class FSNamesystem implements Nam
         return toGet;
       }
     } catch (AccessControlException e) {
-      logAuditEvent(false, "getXAttrs", src);
+      logAuditEvent(false, "getXAttrs", srcArg);
       throw e;
     } finally {
       readUnlock();
@@ -8465,11 +8863,12 @@ public class FSNamesystem implements Nam
   List<XAttr> listXAttrs(String src) throws IOException {
     nnConf.checkXAttrsConfigFlag();
     final FSPermissionChecker pc = getPermissionChecker();
+    final boolean isRawPath = FSDirectory.isReservedRawName(src);
     checkOperation(OperationCategory.READ);
     byte[][] pathComponents = 
FSDirectory.getPathComponentsForReservedPath(src);
     readLock();
     try {
-      src = FSDirectory.resolvePath(src, pathComponents, dir);
+      src = resolvePath(src, pathComponents);
       checkOperation(OperationCategory.READ);
       if (isPermissionEnabled) {
         /* To access xattr names, you need EXECUTE in the owning directory. */
@@ -8477,7 +8876,7 @@ public class FSNamesystem implements Nam
       }
       final List<XAttr> all = dir.getXAttrs(src);
       final List<XAttr> filteredAll = XAttrPermissionFilter.
-        filterXAttrsForApi(pc, all);
+        filterXAttrsForApi(pc, all, isRawPath);
       return filteredAll;
     } catch (AccessControlException e) {
       logAuditEvent(false, "listXAttrs", src);
@@ -8516,19 +8915,21 @@ public class FSNamesystem implements Nam
     }
   }
 
-  void removeXAttrInt(String src, XAttr xAttr, boolean logRetryCache)
+  void removeXAttrInt(final String srcArg, XAttr xAttr, boolean logRetryCache)
       throws IOException {
+    String src = srcArg;
     nnConf.checkXAttrsConfigFlag();
     HdfsFileStatus resultingStat = null;
     FSPermissionChecker pc = getPermissionChecker();
-      XAttrPermissionFilter.checkPermissionForApi(pc, xAttr);
+    XAttrPermissionFilter.checkPermissionForApi(pc, xAttr,
+        FSDirectory.isReservedRawName(src));
     checkOperation(OperationCategory.WRITE);
     byte[][] pathComponents = 
FSDirectory.getPathComponentsForReservedPath(src);
     writeLock();
     try {
       checkOperation(OperationCategory.WRITE);
       checkNameNodeSafeMode("Cannot remove XAttr entry on " + src);
-      src = FSDirectory.resolvePath(src, pathComponents, dir);
+      src = resolvePath(src, pathComponents);
       checkXAttrChangeAccess(src, xAttr, pc);
 
       List<XAttr> xAttrs = Lists.newArrayListWithCapacity(1);
@@ -8545,7 +8946,7 @@ public class FSNamesystem implements Nam
       writeUnlock();
     }
     getEditLog().logSync();
-    logAuditEvent(true, "removeXAttr", src, null, resultingStat);
+    logAuditEvent(true, "removeXAttr", srcArg, null, resultingStat);
   }
 
   private void checkXAttrChangeAccess(String src, XAttr xAttr,

Modified: 
hadoop/common/branches/HDFS-6584/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/NameNodeRpcServer.java
URL: 
http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-6584/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/NameNodeRpcServer.java?rev=1619293&r1=1619292&r2=1619293&view=diff
==============================================================================
--- 
hadoop/common/branches/HDFS-6584/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/NameNodeRpcServer.java
 (original)
+++ 
hadoop/common/branches/HDFS-6584/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/NameNodeRpcServer.java
 Thu Aug 21 05:22:10 2014
@@ -37,6 +37,7 @@ import java.util.Set;
 import org.apache.commons.logging.Log;
 import org.apache.hadoop.HadoopIllegalArgumentException;
 import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.crypto.CipherSuite;
 import org.apache.hadoop.fs.BatchedRemoteIterator.BatchedEntries;
 import org.apache.hadoop.fs.CacheFlag;
 import org.apache.hadoop.fs.CommonConfigurationKeys;
@@ -77,6 +78,7 @@ import org.apache.hadoop.hdfs.protocol.D
 import org.apache.hadoop.hdfs.protocol.DatanodeID;
 import org.apache.hadoop.hdfs.protocol.DatanodeInfo;
 import org.apache.hadoop.hdfs.protocol.DirectoryListing;
+import org.apache.hadoop.hdfs.protocol.EncryptionZoneWithId;
 import org.apache.hadoop.hdfs.protocol.ExtendedBlock;
 import org.apache.hadoop.hdfs.protocol.FSLimitException;
 import org.apache.hadoop.hdfs.protocol.HdfsConstants;
@@ -535,7 +537,8 @@ class NameNodeRpcServer implements Namen
   @Override // ClientProtocol
   public HdfsFileStatus create(String src, FsPermission masked,
       String clientName, EnumSetWritable<CreateFlag> flag,
-      boolean createParent, short replication, long blockSize)
+      boolean createParent, short replication, long blockSize, 
+      List<CipherSuite> cipherSuites)
       throws IOException {
     String clientMachine = getClientMachine();
     if (stateChangeLog.isDebugEnabled()) {
@@ -549,7 +552,7 @@ class NameNodeRpcServer implements Namen
     HdfsFileStatus fileStatus = namesystem.startFile(src, new PermissionStatus(
         getRemoteUser().getShortUserName(), null, masked),
         clientName, clientMachine, flag.get(), createParent, replication,
-        blockSize);
+        blockSize, cipherSuites);
     metrics.incrFilesCreated();
     metrics.incrCreateFileOps();
     return fileStatus;
@@ -1430,6 +1433,24 @@ class NameNodeRpcServer implements Namen
   }
   
   @Override
+  public void createEncryptionZone(String src, String keyName)
+    throws IOException {
+    namesystem.createEncryptionZone(src, keyName);
+  }
+
+  @Override
+  public EncryptionZoneWithId getEZForPath(String src)
+    throws IOException {
+    return namesystem.getEZForPath(src);
+  }
+
+  @Override
+  public BatchedEntries<EncryptionZoneWithId> listEncryptionZones(
+      long prevId) throws IOException {
+    return namesystem.listEncryptionZones(prevId);
+  }
+
+  @Override
   public void setXAttr(String src, XAttr xAttr, EnumSet<XAttrSetFlag> flag)
       throws IOException {
     namesystem.setXAttr(src, xAttr, flag);


Reply via email to