Modified: 
hadoop/common/branches/HADOOP-10388/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSClient.java
URL: 
http://svn.apache.org/viewvc/hadoop/common/branches/HADOOP-10388/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSClient.java?rev=1619012&r1=1619011&r2=1619012&view=diff
==============================================================================
--- 
hadoop/common/branches/HADOOP-10388/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSClient.java
 (original)
+++ 
hadoop/common/branches/HADOOP-10388/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSClient.java
 Tue Aug 19 23:49:39 2014
@@ -17,6 +17,8 @@
  */
 package org.apache.hadoop.hdfs;
 
+import static 
org.apache.hadoop.fs.CommonConfigurationKeys.IPC_CLIENT_FALLBACK_TO_SIMPLE_AUTH_ALLOWED_DEFAULT;
+import static 
org.apache.hadoop.fs.CommonConfigurationKeys.IPC_CLIENT_FALLBACK_TO_SIMPLE_AUTH_ALLOWED_KEY;
 import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_BLOCK_SIZE_DEFAULT;
 import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_BLOCK_SIZE_KEY;
 import static 
org.apache.hadoop.hdfs.DFSConfigKeys.DFS_BYTES_PER_CHECKSUM_DEFAULT;
@@ -30,19 +32,21 @@ import static org.apache.hadoop.hdfs.DFS
 import static 
org.apache.hadoop.hdfs.DFSConfigKeys.DFS_CLIENT_CACHE_DROP_BEHIND_READS;
 import static 
org.apache.hadoop.hdfs.DFSConfigKeys.DFS_CLIENT_CACHE_DROP_BEHIND_WRITES;
 import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_CLIENT_CACHE_READAHEAD;
-import static 
org.apache.hadoop.hdfs.DFSConfigKeys.DFS_CLIENT_DATANODE_RESTART_TIMEOUT_KEY;
+import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_CLIENT_CONTEXT;
+import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_CLIENT_CONTEXT_DEFAULT;
 import static 
org.apache.hadoop.hdfs.DFSConfigKeys.DFS_CLIENT_DATANODE_RESTART_TIMEOUT_DEFAULT;
+import static 
org.apache.hadoop.hdfs.DFSConfigKeys.DFS_CLIENT_DATANODE_RESTART_TIMEOUT_KEY;
 import static 
org.apache.hadoop.hdfs.DFSConfigKeys.DFS_CLIENT_FAILOVER_MAX_ATTEMPTS_DEFAULT;
 import static 
org.apache.hadoop.hdfs.DFSConfigKeys.DFS_CLIENT_FAILOVER_MAX_ATTEMPTS_KEY;
 import static 
org.apache.hadoop.hdfs.DFSConfigKeys.DFS_CLIENT_FAILOVER_SLEEPTIME_BASE_DEFAULT;
 import static 
org.apache.hadoop.hdfs.DFSConfigKeys.DFS_CLIENT_FAILOVER_SLEEPTIME_BASE_KEY;
 import static 
org.apache.hadoop.hdfs.DFSConfigKeys.DFS_CLIENT_FAILOVER_SLEEPTIME_MAX_DEFAULT;
 import static 
org.apache.hadoop.hdfs.DFSConfigKeys.DFS_CLIENT_FAILOVER_SLEEPTIME_MAX_KEY;
-import static 
org.apache.hadoop.hdfs.DFSConfigKeys.DFS_CLIENT_RETRY_MAX_ATTEMPTS_DEFAULT;
-import static 
org.apache.hadoop.hdfs.DFSConfigKeys.DFS_CLIENT_RETRY_MAX_ATTEMPTS_KEY;
 import static 
org.apache.hadoop.hdfs.DFSConfigKeys.DFS_CLIENT_MAX_BLOCK_ACQUIRE_FAILURES_DEFAULT;
 import static 
org.apache.hadoop.hdfs.DFSConfigKeys.DFS_CLIENT_MAX_BLOCK_ACQUIRE_FAILURES_KEY;
 import static 
org.apache.hadoop.hdfs.DFSConfigKeys.DFS_CLIENT_READ_PREFETCH_SIZE_KEY;
+import static 
org.apache.hadoop.hdfs.DFSConfigKeys.DFS_CLIENT_RETRY_MAX_ATTEMPTS_DEFAULT;
+import static 
org.apache.hadoop.hdfs.DFSConfigKeys.DFS_CLIENT_RETRY_MAX_ATTEMPTS_KEY;
 import static 
org.apache.hadoop.hdfs.DFSConfigKeys.DFS_CLIENT_RETRY_WINDOW_BASE;
 import static 
org.apache.hadoop.hdfs.DFSConfigKeys.DFS_CLIENT_SOCKET_CACHE_CAPACITY_DEFAULT;
 import static 
org.apache.hadoop.hdfs.DFSConfigKeys.DFS_CLIENT_SOCKET_CACHE_CAPACITY_KEY;
@@ -58,8 +62,6 @@ import static org.apache.hadoop.hdfs.DFS
 import static 
org.apache.hadoop.hdfs.DFSConfigKeys.DFS_DATANODE_SOCKET_WRITE_TIMEOUT_KEY;
 import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_REPLICATION_DEFAULT;
 import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_REPLICATION_KEY;
-import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_CLIENT_CONTEXT;
-import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_CLIENT_CONTEXT_DEFAULT;
 
 import java.io.BufferedOutputStream;
 import java.io.DataInputStream;
@@ -115,14 +117,17 @@ import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.fs.RemoteIterator;
 import org.apache.hadoop.fs.UnresolvedLinkException;
 import org.apache.hadoop.fs.VolumeId;
+import org.apache.hadoop.fs.XAttr;
+import org.apache.hadoop.fs.XAttrSetFlag;
 import org.apache.hadoop.fs.permission.AclEntry;
 import org.apache.hadoop.fs.permission.AclStatus;
 import org.apache.hadoop.fs.permission.FsPermission;
+import org.apache.hadoop.fs.permission.FsAction;
 import org.apache.hadoop.hdfs.client.HdfsDataInputStream;
 import org.apache.hadoop.hdfs.client.HdfsDataOutputStream;
-import org.apache.hadoop.hdfs.protocol.AclException;
 import org.apache.hadoop.hdfs.net.Peer;
 import org.apache.hadoop.hdfs.net.TcpPeerServer;
+import org.apache.hadoop.hdfs.protocol.AclException;
 import org.apache.hadoop.hdfs.protocol.CacheDirectiveEntry;
 import org.apache.hadoop.hdfs.protocol.CacheDirectiveInfo;
 import org.apache.hadoop.hdfs.protocol.CacheDirectiveIterator;
@@ -132,6 +137,7 @@ import org.apache.hadoop.hdfs.protocol.C
 import org.apache.hadoop.hdfs.protocol.ClientProtocol;
 import org.apache.hadoop.hdfs.protocol.CorruptFileBlocks;
 import org.apache.hadoop.hdfs.protocol.DSQuotaExceededException;
+import org.apache.hadoop.hdfs.protocol.DatanodeID;
 import org.apache.hadoop.hdfs.protocol.DatanodeInfo;
 import org.apache.hadoop.hdfs.protocol.DirectoryListing;
 import org.apache.hadoop.hdfs.protocol.ExtendedBlock;
@@ -149,16 +155,19 @@ import org.apache.hadoop.hdfs.protocol.S
 import org.apache.hadoop.hdfs.protocol.SnapshotDiffReport;
 import org.apache.hadoop.hdfs.protocol.SnapshottableDirectoryStatus;
 import org.apache.hadoop.hdfs.protocol.UnresolvedPathException;
-import org.apache.hadoop.hdfs.protocol.datatransfer.DataTransferEncryptor;
 import org.apache.hadoop.hdfs.protocol.datatransfer.IOStreamPair;
 import org.apache.hadoop.hdfs.protocol.datatransfer.Op;
 import org.apache.hadoop.hdfs.protocol.datatransfer.ReplaceDatanodeOnFailure;
-import org.apache.hadoop.hdfs.protocol.datatransfer.TrustedChannelResolver;
 import org.apache.hadoop.hdfs.protocol.datatransfer.Sender;
+import org.apache.hadoop.hdfs.protocol.datatransfer.TrustedChannelResolver;
+import 
org.apache.hadoop.hdfs.protocol.datatransfer.sasl.DataEncryptionKeyFactory;
+import org.apache.hadoop.hdfs.protocol.datatransfer.sasl.DataTransferSaslUtil;
+import 
org.apache.hadoop.hdfs.protocol.datatransfer.sasl.SaslDataTransferClient;
 import 
org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.BlockOpResponseProto;
 import 
org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.OpBlockChecksumResponseProto;
 import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.Status;
 import org.apache.hadoop.hdfs.protocolPB.PBHelper;
+import org.apache.hadoop.hdfs.security.token.block.BlockTokenIdentifier;
 import org.apache.hadoop.hdfs.security.token.block.DataEncryptionKey;
 import org.apache.hadoop.hdfs.security.token.block.InvalidBlockTokenException;
 import 
org.apache.hadoop.hdfs.security.token.delegation.DelegationTokenIdentifier;
@@ -166,6 +175,7 @@ import org.apache.hadoop.hdfs.server.com
 import org.apache.hadoop.hdfs.server.datanode.CachingStrategy;
 import org.apache.hadoop.hdfs.server.namenode.NameNode;
 import org.apache.hadoop.hdfs.server.namenode.SafeModeException;
+import org.apache.hadoop.hdfs.server.protocol.DatanodeStorageReport;
 import org.apache.hadoop.io.DataOutputBuffer;
 import org.apache.hadoop.io.EnumSetWritable;
 import org.apache.hadoop.io.IOUtils;
@@ -191,6 +201,7 @@ import org.apache.hadoop.util.Time;
 import com.google.common.annotations.VisibleForTesting;
 import com.google.common.base.Joiner;
 import com.google.common.base.Preconditions;
+import com.google.common.collect.Lists;
 import com.google.common.net.InetAddresses;
 
 /********************************************************
@@ -205,7 +216,8 @@ import com.google.common.net.InetAddress
  *
  ********************************************************/
 @InterfaceAudience.Private
-public class DFSClient implements java.io.Closeable, RemotePeerFactory {
+public class DFSClient implements java.io.Closeable, RemotePeerFactory,
+    DataEncryptionKeyFactory {
   public static final Log LOG = LogFactory.getLog(DFSClient.class);
   public static final long SERVER_DEFAULTS_VALIDITY_PERIOD = 60 * 60 * 1000L; 
// 1 hour
   static final int TCP_WINDOW_SIZE = 128 * 1024; // 128 KB
@@ -229,7 +241,7 @@ public class DFSClient implements java.i
   private final Random r = new Random();
   private SocketAddress[] localInterfaceAddrs;
   private DataEncryptionKey encryptionKey;
-  final TrustedChannelResolver trustedChannelResolver;
+  final SaslDataTransferClient saslClient;
   private final CachingStrategy defaultReadCachingStrategy;
   private final CachingStrategy defaultWriteCachingStrategy;
   private final ClientContext clientContext;
@@ -274,6 +286,7 @@ public class DFSClient implements java.i
     final int retryTimesForGetLastBlockLength;
     final int retryIntervalForGetLastBlockLength;
     final long datanodeRestartTimeout;
+    final long dfsclientSlowIoWarningThresholdMs;
 
     final boolean useLegacyBlockReader;
     final boolean useLegacyBlockReaderLocal;
@@ -428,6 +441,9 @@ public class DFSClient implements java.i
       datanodeRestartTimeout = conf.getLong(
           DFS_CLIENT_DATANODE_RESTART_TIMEOUT_KEY,
           DFS_CLIENT_DATANODE_RESTART_TIMEOUT_DEFAULT) * 1000;
+      dfsclientSlowIoWarningThresholdMs = conf.getLong(
+          DFSConfigKeys.DFS_CLIENT_SLOW_IO_WARNING_THRESHOLD_KEY,
+          DFSConfigKeys.DFS_CLIENT_SLOW_IO_WARNING_THRESHOLD_DEFAULT);
     }
 
     public boolean isUseLegacyBlockReaderLocal() {
@@ -502,8 +518,8 @@ public class DFSClient implements java.i
    * that are currently being written by this client.
    * Note that a file can only be written by a single client.
    */
-  private final Map<String, DFSOutputStream> filesBeingWritten
-      = new HashMap<String, DFSOutputStream>();
+  private final Map<Long, DFSOutputStream> filesBeingWritten
+      = new HashMap<Long, DFSOutputStream>();
 
   /**
    * Same as this(NameNode.getAddress(conf), conf);
@@ -627,7 +643,12 @@ public class DFSClient implements java.i
     if (numThreads > 0) {
       this.initThreadsNumForHedgedReads(numThreads);
     }
-    this.trustedChannelResolver = 
TrustedChannelResolver.getInstance(getConfiguration());
+    this.saslClient = new SaslDataTransferClient(
+      DataTransferSaslUtil.getSaslPropertiesResolver(conf),
+      TrustedChannelResolver.getInstance(conf),
+      conf.getBoolean(
+        IPC_CLIENT_FALLBACK_TO_SIMPLE_AUTH_ALLOWED_KEY,
+        IPC_CLIENT_FALLBACK_TO_SIMPLE_AUTH_ALLOWED_DEFAULT));
   }
   
   /**
@@ -732,14 +753,14 @@ public class DFSClient implements java.i
   }
 
   /** Get a lease and start automatic renewal */
-  private void beginFileLease(final String src, final DFSOutputStream out) 
+  private void beginFileLease(final long inodeId, final DFSOutputStream out)
       throws IOException {
-    getLeaseRenewer().put(src, out, this);
+    getLeaseRenewer().put(inodeId, out, this);
   }
 
   /** Stop renewal of lease for the file. */
-  void endFileLease(final String src) throws IOException {
-    getLeaseRenewer().closeFile(src, this);
+  void endFileLease(final long inodeId) throws IOException {
+    getLeaseRenewer().closeFile(inodeId, this);
   }
     
 
@@ -747,9 +768,9 @@ public class DFSClient implements java.i
    *  enforced to consistently update its local dfsclients array and 
    *  client's filesBeingWritten map.
    */
-  void putFileBeingWritten(final String src, final DFSOutputStream out) {
+  void putFileBeingWritten(final long inodeId, final DFSOutputStream out) {
     synchronized(filesBeingWritten) {
-      filesBeingWritten.put(src, out);
+      filesBeingWritten.put(inodeId, out);
       // update the last lease renewal time only when there was no
       // writes. once there is one write stream open, the lease renewer
       // thread keeps it updated well with in anyone's expiration time.
@@ -760,9 +781,9 @@ public class DFSClient implements java.i
   }
 
   /** Remove a file. Only called from LeaseRenewer. */
-  void removeFileBeingWritten(final String src) {
+  void removeFileBeingWritten(final long inodeId) {
     synchronized(filesBeingWritten) {
-      filesBeingWritten.remove(src);
+      filesBeingWritten.remove(inodeId);
       if (filesBeingWritten.isEmpty()) {
         lastLeaseRenewal = 0;
       }
@@ -847,14 +868,14 @@ public class DFSClient implements java.i
   /** Close/abort all files being written. */
   private void closeAllFilesBeingWritten(final boolean abort) {
     for(;;) {
-      final String src;
+      final long inodeId;
       final DFSOutputStream out;
       synchronized(filesBeingWritten) {
         if (filesBeingWritten.isEmpty()) {
           return;
         }
-        src = filesBeingWritten.keySet().iterator().next();
-        out = filesBeingWritten.remove(src);
+        inodeId = filesBeingWritten.keySet().iterator().next();
+        out = filesBeingWritten.remove(inodeId);
       }
       if (out != null) {
         try {
@@ -864,8 +885,8 @@ public class DFSClient implements java.i
             out.close();
           }
         } catch(IOException ie) {
-          LOG.error("Failed to " + (abort? "abort": "close") + " file " + src,
-              ie);
+          LOG.error("Failed to " + (abort? "abort": "close") +
+                  " inode " + inodeId, ie);
         }
       }
     }
@@ -1463,7 +1484,7 @@ public class DFSClient implements java.i
     final DFSOutputStream result = DFSOutputStream.newStreamForCreate(this,
         src, masked, flag, createParent, replication, blockSize, progress,
         buffersize, dfsClientConf.createChecksum(checksumOpt), 
favoredNodeStrs);
-    beginFileLease(src, result);
+    beginFileLease(result.getFileId(), result);
     return result;
   }
   
@@ -1511,7 +1532,7 @@ public class DFSClient implements java.i
           flag, createParent, replication, blockSize, progress, buffersize,
           checksum);
     }
-    beginFileLease(src, result);
+    beginFileLease(result.getFileId(), result);
     return result;
   }
   
@@ -1599,14 +1620,14 @@ public class DFSClient implements java.i
           + src + " on client " + clientName);
     }
     final DFSOutputStream result = callAppend(stat, src, buffersize, progress);
-    beginFileLease(src, result);
+    beginFileLease(result.getFileId(), result);
     return result;
   }
 
   /**
    * Set replication for an existing file.
    * @param src file name
-   * @param replication
+   * @param replication replication to set the file to
    * 
    * @see ClientProtocol#setReplication(String, short)
    */
@@ -1797,19 +1818,6 @@ public class DFSClient implements java.i
                                      UnresolvedPathException.class);
      }
    }
-
-  /**
-   * Get the checksum of a file.
-   * @param src The file path
-   * @return The checksum 
-   * @see DistributedFileSystem#getFileChecksum(Path)
-   */
-  public MD5MD5CRC32FileChecksum getFileChecksum(String src) throws 
IOException {
-    checkOpen();
-    return getFileChecksum(src, clientName, namenode, socketFactory,
-        dfsClientConf.socketTimeout, getDataEncryptionKey(),
-        dfsClientConf.connectToDnViaHostname);
-  }
   
   @InterfaceAudience.Private
   public void clearDataEncryptionKey() {
@@ -1829,11 +1837,9 @@ public class DFSClient implements java.i
     return d == null ? false : d.getEncryptDataTransfer();
   }
   
-  @InterfaceAudience.Private
-  public DataEncryptionKey getDataEncryptionKey()
-      throws IOException {
-    if (shouldEncryptData() && 
-        !this.trustedChannelResolver.isTrusted()) {
+  @Override
+  public DataEncryptionKey newDataEncryptionKey() throws IOException {
+    if (shouldEncryptData()) {
       synchronized (this) {
         if (encryptionKey == null ||
             encryptionKey.expiryDate < Time.now()) {
@@ -1848,23 +1854,20 @@ public class DFSClient implements java.i
   }
 
   /**
-   * Get the checksum of a file.
+   * Get the checksum of the whole file of a range of the file. Note that the
+   * range always starts from the beginning of the file.
    * @param src The file path
-   * @param clientName the name of the client requesting the checksum.
-   * @param namenode the RPC proxy for the namenode
-   * @param socketFactory to create sockets to connect to DNs
-   * @param socketTimeout timeout to use when connecting and waiting for a 
response
-   * @param encryptionKey the key needed to communicate with DNs in this 
cluster
-   * @param connectToDnViaHostname whether the client should use hostnames 
instead of IPs
+   * @param length the length of the range, i.e., the range is [0, length]
    * @return The checksum 
+   * @see DistributedFileSystem#getFileChecksum(Path)
    */
-  private static MD5MD5CRC32FileChecksum getFileChecksum(String src,
-      String clientName,
-      ClientProtocol namenode, SocketFactory socketFactory, int socketTimeout,
-      DataEncryptionKey encryptionKey, boolean connectToDnViaHostname)
+  public MD5MD5CRC32FileChecksum getFileChecksum(String src, long length)
       throws IOException {
-    //get all block locations
-    LocatedBlocks blockLocations = callGetBlockLocations(namenode, src, 0, 
Long.MAX_VALUE);
+    checkOpen();
+    Preconditions.checkArgument(length >= 0);
+    //get block locations for the file range
+    LocatedBlocks blockLocations = callGetBlockLocations(namenode, src, 0,
+        length);
     if (null == blockLocations) {
       throw new FileNotFoundException("File does not exist: " + src);
     }
@@ -1876,10 +1879,11 @@ public class DFSClient implements java.i
     boolean refetchBlocks = false;
     int lastRetriedIndex = -1;
 
-    //get block checksum for each block
-    for(int i = 0; i < locatedblocks.size(); i++) {
+    // get block checksum for each block
+    long remaining = length;
+    for(int i = 0; i < locatedblocks.size() && remaining > 0; i++) {
       if (refetchBlocks) {  // refetch to get fresh tokens
-        blockLocations = callGetBlockLocations(namenode, src, 0, 
Long.MAX_VALUE);
+        blockLocations = callGetBlockLocations(namenode, src, 0, length);
         if (null == blockLocations) {
           throw new FileNotFoundException("File does not exist: " + src);
         }
@@ -1888,10 +1892,14 @@ public class DFSClient implements java.i
       }
       LocatedBlock lb = locatedblocks.get(i);
       final ExtendedBlock block = lb.getBlock();
+      if (remaining < block.getNumBytes()) {
+        block.setNumBytes(remaining);
+      }
+      remaining -= block.getNumBytes();
       final DatanodeInfo[] datanodes = lb.getLocations();
       
       //try each datanode location of the block
-      final int timeout = 3000 * datanodes.length + socketTimeout;
+      final int timeout = 3000 * datanodes.length + 
dfsClientConf.socketTimeout;
       boolean done = false;
       for(int j = 0; !done && j < datanodes.length; j++) {
         DataOutputStream out = null;
@@ -1899,8 +1907,7 @@ public class DFSClient implements java.i
         
         try {
           //connect to a datanode
-          IOStreamPair pair = connectToDN(socketFactory, 
connectToDnViaHostname,
-              encryptionKey, datanodes[j], timeout);
+          IOStreamPair pair = connectToDN(datanodes[j], timeout, lb);
           out = new DataOutputStream(new BufferedOutputStream(pair.out,
               HdfsConstants.SMALL_BUFFER_SIZE));
           in = new DataInputStream(pair.in);
@@ -1956,9 +1963,7 @@ public class DFSClient implements java.i
           } else {
             LOG.debug("Retrieving checksum from an earlier-version DataNode: " 
+
                       "inferring checksum by reading first byte");
-            ct = inferChecksumTypeByReading(
-                clientName, socketFactory, socketTimeout, lb, datanodes[j],
-                encryptionKey, connectToDnViaHostname);
+            ct = inferChecksumTypeByReading(lb, datanodes[j]);
           }
 
           if (i == 0) { // first block
@@ -2032,16 +2037,13 @@ public class DFSClient implements java.i
    * Connect to the given datanode's datantrasfer port, and return
    * the resulting IOStreamPair. This includes encryption wrapping, etc.
    */
-  private static IOStreamPair connectToDN(
-      SocketFactory socketFactory, boolean connectToDnViaHostname,
-      DataEncryptionKey encryptionKey, DatanodeInfo dn, int timeout)
-      throws IOException
-  {
+  private IOStreamPair connectToDN(DatanodeInfo dn, int timeout,
+      LocatedBlock lb) throws IOException {
     boolean success = false;
     Socket sock = null;
     try {
       sock = socketFactory.createSocket();
-      String dnAddr = dn.getXferAddr(connectToDnViaHostname);
+      String dnAddr = dn.getXferAddr(getConf().connectToDnViaHostname);
       if (LOG.isDebugEnabled()) {
         LOG.debug("Connecting to datanode " + dnAddr);
       }
@@ -2050,13 +2052,8 @@ public class DFSClient implements java.i
   
       OutputStream unbufOut = NetUtils.getOutputStream(sock);
       InputStream unbufIn = NetUtils.getInputStream(sock);
-      IOStreamPair ret;
-      if (encryptionKey != null) {
-        ret = DataTransferEncryptor.getEncryptedStreams(
-                unbufOut, unbufIn, encryptionKey);
-      } else {
-        ret = new IOStreamPair(unbufIn, unbufOut);        
-      }
+      IOStreamPair ret = saslClient.newSocketSend(sock, unbufOut, unbufIn, 
this,
+        lb.getBlockToken(), dn);
       success = true;
       return ret;
     } finally {
@@ -2072,21 +2069,14 @@ public class DFSClient implements java.i
    * with older HDFS versions which did not include the checksum type in
    * OpBlockChecksumResponseProto.
    *
-   * @param in input stream from datanode
-   * @param out output stream to datanode
    * @param lb the located block
-   * @param clientName the name of the DFSClient requesting the checksum
    * @param dn the connected datanode
    * @return the inferred checksum type
    * @throws IOException if an error occurs
    */
-  private static Type inferChecksumTypeByReading(
-      String clientName, SocketFactory socketFactory, int socketTimeout,
-      LocatedBlock lb, DatanodeInfo dn,
-      DataEncryptionKey encryptionKey, boolean connectToDnViaHostname)
+  private Type inferChecksumTypeByReading(LocatedBlock lb, DatanodeInfo dn)
       throws IOException {
-    IOStreamPair pair = connectToDN(socketFactory, connectToDnViaHostname,
-        encryptionKey, dn, socketTimeout);
+    IOStreamPair pair = connectToDN(dn, dfsClientConf.socketTimeout, lb);
 
     try {
       DataOutputStream out = new DataOutputStream(new 
BufferedOutputStream(pair.out,
@@ -2116,7 +2106,7 @@ public class DFSClient implements java.i
   /**
    * Set permissions to a file or directory.
    * @param src path name.
-   * @param permission
+   * @param permission permission to set to
    * 
    * @see ClientProtocol#setPermission(String, FsPermission)
    */
@@ -2204,6 +2194,11 @@ public class DFSClient implements java.i
     return namenode.getDatanodeReport(type);
   }
     
+  public DatanodeStorageReport[] getDatanodeStorageReport(
+      DatanodeReportType type) throws IOException {
+    return namenode.getDatanodeStorageReport(type);
+  }
+
   /**
    * Enter, leave or get safe mode.
    * 
@@ -2436,8 +2431,8 @@ public class DFSClient implements java.i
   }
 
   @VisibleForTesting
-  ExtendedBlock getPreviousBlock(String file) {
-    return filesBeingWritten.get(file).getBlock();
+  ExtendedBlock getPreviousBlock(long fileId) {
+    return filesBeingWritten.get(fileId).getBlock();
   }
   
   /**
@@ -2757,9 +2752,102 @@ public class DFSClient implements java.i
                                      UnresolvedPathException.class);
     }
   }
+  
+  public void setXAttr(String src, String name, byte[] value, 
+      EnumSet<XAttrSetFlag> flag) throws IOException {
+    checkOpen();
+    try {
+      namenode.setXAttr(src, XAttrHelper.buildXAttr(name, value), flag);
+    } catch (RemoteException re) {
+      throw re.unwrapRemoteException(AccessControlException.class,
+                                     FileNotFoundException.class,
+                                     NSQuotaExceededException.class,
+                                     SafeModeException.class,
+                                     SnapshotAccessControlException.class,
+                                     UnresolvedPathException.class);
+    }
+  }
+  
+  public byte[] getXAttr(String src, String name) throws IOException {
+    checkOpen();
+    try {
+      final List<XAttr> xAttrs = XAttrHelper.buildXAttrAsList(name);
+      final List<XAttr> result = namenode.getXAttrs(src, xAttrs);
+      return XAttrHelper.getFirstXAttrValue(result);
+    } catch(RemoteException re) {
+      throw re.unwrapRemoteException(AccessControlException.class,
+                                     FileNotFoundException.class,
+                                     UnresolvedPathException.class);
+    }
+  }
+  
+  public Map<String, byte[]> getXAttrs(String src) throws IOException {
+    checkOpen();
+    try {
+      return XAttrHelper.buildXAttrMap(namenode.getXAttrs(src, null));
+    } catch(RemoteException re) {
+      throw re.unwrapRemoteException(AccessControlException.class,
+                                     FileNotFoundException.class,
+                                     UnresolvedPathException.class);
+    }
+  }
+  
+  public Map<String, byte[]> getXAttrs(String src, List<String> names) 
+      throws IOException {
+    checkOpen();
+    try {
+      return XAttrHelper.buildXAttrMap(namenode.getXAttrs(
+          src, XAttrHelper.buildXAttrs(names)));
+    } catch(RemoteException re) {
+      throw re.unwrapRemoteException(AccessControlException.class,
+                                     FileNotFoundException.class,
+                                     UnresolvedPathException.class);
+    }
+  }
+  
+  public List<String> listXAttrs(String src)
+          throws IOException {
+    checkOpen();
+    try {
+      final Map<String, byte[]> xattrs =
+        XAttrHelper.buildXAttrMap(namenode.listXAttrs(src));
+      return Lists.newArrayList(xattrs.keySet());
+    } catch(RemoteException re) {
+      throw re.unwrapRemoteException(AccessControlException.class,
+                                     FileNotFoundException.class,
+                                     UnresolvedPathException.class);
+    }
+  }
+
+  public void removeXAttr(String src, String name) throws IOException {
+    checkOpen();
+    try {
+      namenode.removeXAttr(src, XAttrHelper.buildXAttr(name));
+    } catch(RemoteException re) {
+      throw re.unwrapRemoteException(AccessControlException.class,
+                                     FileNotFoundException.class,
+                                     NSQuotaExceededException.class,
+                                     SafeModeException.class,
+                                     SnapshotAccessControlException.class,
+                                     UnresolvedPathException.class);
+    }
+  }
+
+  public void checkAccess(String src, FsAction mode) throws IOException {
+    checkOpen();
+    try {
+      namenode.checkAccess(src, mode);
+    } catch (RemoteException re) {
+      throw re.unwrapRemoteException(AccessControlException.class,
+          FileNotFoundException.class,
+          UnresolvedPathException.class);
+    }
+  }
 
   @Override // RemotePeerFactory
-  public Peer newConnectedPeer(InetSocketAddress addr) throws IOException {
+  public Peer newConnectedPeer(InetSocketAddress addr,
+      Token<BlockTokenIdentifier> blockToken, DatanodeID datanodeId)
+      throws IOException {
     Peer peer = null;
     boolean success = false;
     Socket sock = null;
@@ -2768,8 +2856,8 @@ public class DFSClient implements java.i
       NetUtils.connect(sock, addr,
         getRandomLocalInterfaceAddr(),
         dfsClientConf.socketTimeout);
-      peer = TcpPeerServer.peerFromSocketAndKey(sock, 
-          getDataEncryptionKey());
+      peer = TcpPeerServer.peerFromSocketAndKey(saslClient, sock, this,
+          blockToken, datanodeId);
       success = true;
       return peer;
     } finally {

Modified: 
hadoop/common/branches/HADOOP-10388/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSClientFaultInjector.java
URL: 
http://svn.apache.org/viewvc/hadoop/common/branches/HADOOP-10388/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSClientFaultInjector.java?rev=1619012&r1=1619011&r2=1619012&view=diff
==============================================================================
--- 
hadoop/common/branches/HADOOP-10388/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSClientFaultInjector.java
 (original)
+++ 
hadoop/common/branches/HADOOP-10388/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSClientFaultInjector.java
 Tue Aug 19 23:49:39 2014
@@ -17,6 +17,8 @@
  */
 package org.apache.hadoop.hdfs;
 
+import java.util.concurrent.atomic.AtomicLong;
+
 import com.google.common.annotations.VisibleForTesting;
 
 import org.apache.hadoop.classification.InterfaceAudience;
@@ -29,6 +31,7 @@ import org.apache.hadoop.classification.
 @InterfaceAudience.Private
 public class DFSClientFaultInjector {
   public static DFSClientFaultInjector instance = new DFSClientFaultInjector();
+  public static AtomicLong exceptionNum = new AtomicLong(0);
 
   public static DFSClientFaultInjector get() {
     return instance;
@@ -47,4 +50,8 @@ public class DFSClientFaultInjector {
   }
 
   public void startFetchFromDatanode() {}
+
+  public void fetchFromDatanodeException() {}
+
+  public void readFromDatanodeDelay() {}
 }

Modified: 
hadoop/common/branches/HADOOP-10388/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSConfigKeys.java
URL: 
http://svn.apache.org/viewvc/hadoop/common/branches/HADOOP-10388/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSConfigKeys.java?rev=1619012&r1=1619011&r2=1619012&view=diff
==============================================================================
--- 
hadoop/common/branches/HADOOP-10388/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSConfigKeys.java
 (original)
+++ 
hadoop/common/branches/HADOOP-10388/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSConfigKeys.java
 Tue Aug 19 23:49:39 2014
@@ -105,12 +105,16 @@ public class DFSConfigKeys extends Commo
   public static final String  DFS_NAMENODE_BACKUP_SERVICE_RPC_ADDRESS_KEY = 
"dfs.namenode.backup.dnrpc-address";
   public static final String  DFS_DATANODE_BALANCE_BANDWIDTHPERSEC_KEY = 
"dfs.datanode.balance.bandwidthPerSec";
   public static final long    DFS_DATANODE_BALANCE_BANDWIDTHPERSEC_DEFAULT = 
1024*1024;
+  public static final String  
DFS_DATANODE_BALANCE_MAX_NUM_CONCURRENT_MOVES_KEY = 
"dfs.datanode.balance.max.concurrent.moves";
+  public static final int     
DFS_DATANODE_BALANCE_MAX_NUM_CONCURRENT_MOVES_DEFAULT = 5;
   public static final String  DFS_DATANODE_READAHEAD_BYTES_KEY = 
"dfs.datanode.readahead.bytes";
   public static final long    DFS_DATANODE_READAHEAD_BYTES_DEFAULT = 4 * 1024 
* 1024; // 4MB
   public static final String  DFS_DATANODE_DROP_CACHE_BEHIND_WRITES_KEY = 
"dfs.datanode.drop.cache.behind.writes";
   public static final boolean DFS_DATANODE_DROP_CACHE_BEHIND_WRITES_DEFAULT = 
false;
   public static final String  DFS_DATANODE_SYNC_BEHIND_WRITES_KEY = 
"dfs.datanode.sync.behind.writes";
   public static final boolean DFS_DATANODE_SYNC_BEHIND_WRITES_DEFAULT = false;
+  public static final String  
DFS_DATANODE_SYNC_BEHIND_WRITES_IN_BACKGROUND_KEY = 
"dfs.datanode.sync.behind.writes.in.background";
+  public static final boolean 
DFS_DATANODE_SYNC_BEHIND_WRITES_IN_BACKGROUND_DEFAULT = false;
   public static final String  DFS_DATANODE_DROP_CACHE_BEHIND_READS_KEY = 
"dfs.datanode.drop.cache.behind.reads";
   public static final boolean DFS_DATANODE_DROP_CACHE_BEHIND_READS_DEFAULT = 
false;
   public static final String  DFS_DATANODE_USE_DN_HOSTNAME = 
"dfs.datanode.use.datanode.hostname";
@@ -126,6 +130,7 @@ public class DFSConfigKeys extends Commo
   public static final int     DFS_NAMENODE_HTTP_PORT_DEFAULT = 50070;
   public static final String  DFS_NAMENODE_HTTP_ADDRESS_KEY = 
"dfs.namenode.http-address";
   public static final String  DFS_NAMENODE_HTTP_ADDRESS_DEFAULT = "0.0.0.0:" + 
DFS_NAMENODE_HTTP_PORT_DEFAULT;
+  public static final String  DFS_NAMENODE_HTTP_BIND_HOST_KEY = 
"dfs.namenode.http-bind-host";
   public static final String  DFS_NAMENODE_RPC_ADDRESS_KEY = 
"dfs.namenode.rpc-address";
   public static final String  DFS_NAMENODE_RPC_BIND_HOST_KEY = 
"dfs.namenode.rpc-bind-host";
   public static final String  DFS_NAMENODE_SERVICE_RPC_ADDRESS_KEY = 
"dfs.namenode.servicerpc-address";
@@ -191,9 +196,14 @@ public class DFSConfigKeys extends Commo
   public static final String  DFS_PERMISSIONS_SUPERUSERGROUP_DEFAULT = 
"supergroup";
   public static final String  DFS_NAMENODE_ACLS_ENABLED_KEY = 
"dfs.namenode.acls.enabled";
   public static final boolean DFS_NAMENODE_ACLS_ENABLED_DEFAULT = false;
+  public static final String  DFS_NAMENODE_XATTRS_ENABLED_KEY = 
"dfs.namenode.xattrs.enabled";
+  public static final boolean DFS_NAMENODE_XATTRS_ENABLED_DEFAULT = true;
   public static final String  DFS_ADMIN = "dfs.cluster.administrators";
   public static final String  DFS_SERVER_HTTPS_KEYSTORE_RESOURCE_KEY = 
"dfs.https.server.keystore.resource";
   public static final String  DFS_SERVER_HTTPS_KEYSTORE_RESOURCE_DEFAULT = 
"ssl-server.xml";
+  public static final String  DFS_SERVER_HTTPS_KEYPASSWORD_KEY = 
"ssl.server.keystore.keypassword";
+  public static final String  DFS_SERVER_HTTPS_KEYSTORE_PASSWORD_KEY = 
"ssl.server.keystore.password";
+  public static final String  DFS_SERVER_HTTPS_TRUSTSTORE_PASSWORD_KEY = 
"ssl.server.truststore.password";
   public static final String  DFS_NAMENODE_NAME_DIR_RESTORE_KEY = 
"dfs.namenode.name.dir.restore";
   public static final boolean DFS_NAMENODE_NAME_DIR_RESTORE_DEFAULT = false;
   public static final String  DFS_NAMENODE_SUPPORT_ALLOW_FORMAT_KEY = 
"dfs.namenode.support.allow.format";
@@ -207,6 +217,9 @@ public class DFSConfigKeys extends Commo
   public static final String  DFS_NAMENODE_MIN_SUPPORTED_DATANODE_VERSION_KEY 
= "dfs.namenode.min.supported.datanode.version";
   public static final String  
DFS_NAMENODE_MIN_SUPPORTED_DATANODE_VERSION_DEFAULT = "3.0.0-SNAPSHOT";
 
+  public static final String DFS_NAMENODE_RANDOMIZE_BLOCK_LOCATIONS_PER_BLOCK 
= "dfs.namenode.randomize-block-locations-per-block";
+  public static final boolean 
DFS_NAMENODE_RANDOMIZE_BLOCK_LOCATIONS_PER_BLOCK_DEFAULT = false;
+
   public static final String  DFS_NAMENODE_EDITS_DIR_MINIMUM_KEY = 
"dfs.namenode.edits.dir.minimum";
   public static final int     DFS_NAMENODE_EDITS_DIR_MINIMUM_DEFAULT = 1;
 
@@ -244,6 +257,10 @@ public class DFSConfigKeys extends Commo
       "dfs.namenode.path.based.cache.refresh.interval.ms";
   public static final long    
DFS_NAMENODE_PATH_BASED_CACHE_REFRESH_INTERVAL_MS_DEFAULT = 30000L;
 
+  /** Pending period of block deletion since NameNode startup */
+  public static final String  
DFS_NAMENODE_STARTUP_DELAY_BLOCK_DELETION_SEC_KEY = 
"dfs.namenode.startup.delay.block.deletion.sec";
+  public static final long    
DFS_NAMENODE_STARTUP_DELAY_BLOCK_DELETION_SEC_DEFAULT = 0L;
+
   // Whether to enable datanode's stale state detection and usage for reads
   public static final String DFS_NAMENODE_AVOID_STALE_DATANODE_FOR_READ_KEY = 
"dfs.namenode.avoid.read.stale.datanode";
   public static final boolean 
DFS_NAMENODE_AVOID_STALE_DATANODE_FOR_READ_DEFAULT = false;
@@ -290,11 +307,17 @@ public class DFSConfigKeys extends Commo
   public static final long    DFS_NAMENODE_MIN_BLOCK_SIZE_DEFAULT = 1024*1024;
   public static final String  DFS_NAMENODE_MAX_BLOCKS_PER_FILE_KEY = 
"dfs.namenode.fs-limits.max-blocks-per-file";
   public static final long    DFS_NAMENODE_MAX_BLOCKS_PER_FILE_DEFAULT = 
1024*1024;
+  public static final String  DFS_NAMENODE_MAX_XATTRS_PER_INODE_KEY = 
"dfs.namenode.fs-limits.max-xattrs-per-inode";
+  public static final int     DFS_NAMENODE_MAX_XATTRS_PER_INODE_DEFAULT = 32;
+  public static final String  DFS_NAMENODE_MAX_XATTR_SIZE_KEY = 
"dfs.namenode.fs-limits.max-xattr-size";
+  public static final int     DFS_NAMENODE_MAX_XATTR_SIZE_DEFAULT = 16384;
+
 
   //Following keys have no defaults
   public static final String  DFS_DATANODE_DATA_DIR_KEY = 
"dfs.datanode.data.dir";
   public static final int     DFS_NAMENODE_HTTPS_PORT_DEFAULT = 50470;
   public static final String  DFS_NAMENODE_HTTPS_ADDRESS_KEY = 
"dfs.namenode.https-address";
+  public static final String  DFS_NAMENODE_HTTPS_BIND_HOST_KEY = 
"dfs.namenode.https-bind-host";
   public static final String  DFS_NAMENODE_HTTPS_ADDRESS_DEFAULT = "0.0.0.0:" 
+ DFS_NAMENODE_HTTPS_PORT_DEFAULT;
   public static final String  DFS_NAMENODE_NAME_DIR_KEY = 
"dfs.namenode.name.dir";
   public static final String  DFS_NAMENODE_EDITS_DIR_KEY = 
"dfs.namenode.edits.dir";
@@ -361,8 +384,6 @@ public class DFSConfigKeys extends Commo
   public static final String  DFS_DATANODE_HTTP_ADDRESS_DEFAULT = "0.0.0.0:" + 
DFS_DATANODE_HTTP_DEFAULT_PORT;
   public static final String  DFS_DATANODE_MAX_RECEIVER_THREADS_KEY = 
"dfs.datanode.max.transfer.threads";
   public static final int     DFS_DATANODE_MAX_RECEIVER_THREADS_DEFAULT = 4096;
-  public static final String  DFS_DATANODE_NUMBLOCKS_KEY = 
"dfs.datanode.numblocks";
-  public static final int     DFS_DATANODE_NUMBLOCKS_DEFAULT = 64;
   public static final String  DFS_DATANODE_SCAN_PERIOD_HOURS_KEY = 
"dfs.datanode.scan.period.hours";
   public static final int     DFS_DATANODE_SCAN_PERIOD_HOURS_DEFAULT = 0;
   public static final String  DFS_DATANODE_TRANSFERTO_ALLOWED_KEY = 
"dfs.datanode.transferTo.allowed";
@@ -379,8 +400,6 @@ public class DFSConfigKeys extends Commo
   public static final int     DFS_NAMENODE_HANDLER_COUNT_DEFAULT = 10;
   public static final String  DFS_NAMENODE_SERVICE_HANDLER_COUNT_KEY = 
"dfs.namenode.service.handler.count";
   public static final int     DFS_NAMENODE_SERVICE_HANDLER_COUNT_DEFAULT = 10;
-  public static final String  DFS_SUPPORT_APPEND_KEY = "dfs.support.append";
-  public static final boolean DFS_SUPPORT_APPEND_DEFAULT = true;
   public static final String  DFS_HTTP_POLICY_KEY = "dfs.http.policy";
   public static final String  DFS_HTTP_POLICY_DEFAULT =  
HttpConfig.Policy.HTTP_ONLY.name();
   public static final String  DFS_DEFAULT_CHUNK_VIEW_SIZE_KEY = 
"dfs.default.chunk.view.size";
@@ -481,18 +500,29 @@ public class DFSConfigKeys extends Commo
   public static final String  DFS_NAMENODE_STARTUP_KEY = 
"dfs.namenode.startup";
   public static final String  DFS_DATANODE_KEYTAB_FILE_KEY = 
"dfs.datanode.keytab.file";
   public static final String  DFS_DATANODE_KERBEROS_PRINCIPAL_KEY = 
"dfs.datanode.kerberos.principal";
+  @Deprecated
+  public static final String  DFS_DATANODE_USER_NAME_KEY = 
DFS_DATANODE_KERBEROS_PRINCIPAL_KEY;
   public static final String  DFS_DATANODE_SHARED_FILE_DESCRIPTOR_PATHS = 
"dfs.datanode.shared.file.descriptor.paths";
   public static final String  
DFS_DATANODE_SHARED_FILE_DESCRIPTOR_PATHS_DEFAULT = "/dev/shm,/tmp";
   public static final String  
DFS_SHORT_CIRCUIT_SHARED_MEMORY_WATCHER_INTERRUPT_CHECK_MS = 
"dfs.short.circuit.shared.memory.watcher.interrupt.check.ms";
   public static final int     
DFS_SHORT_CIRCUIT_SHARED_MEMORY_WATCHER_INTERRUPT_CHECK_MS_DEFAULT = 60000;
   public static final String  DFS_NAMENODE_KEYTAB_FILE_KEY = 
"dfs.namenode.keytab.file";
   public static final String  DFS_NAMENODE_KERBEROS_PRINCIPAL_KEY = 
"dfs.namenode.kerberos.principal";
+  @Deprecated
+  public static final String  DFS_NAMENODE_USER_NAME_KEY = 
DFS_NAMENODE_KERBEROS_PRINCIPAL_KEY;
   public static final String  
DFS_NAMENODE_KERBEROS_INTERNAL_SPNEGO_PRINCIPAL_KEY = 
"dfs.namenode.kerberos.internal.spnego.principal";
+  @Deprecated
+  public static final String  DFS_NAMENODE_INTERNAL_SPNEGO_USER_NAME_KEY = 
DFS_NAMENODE_KERBEROS_INTERNAL_SPNEGO_PRINCIPAL_KEY;
   public static final String  DFS_SECONDARY_NAMENODE_KEYTAB_FILE_KEY = 
"dfs.secondary.namenode.keytab.file";
   public static final String  DFS_SECONDARY_NAMENODE_KERBEROS_PRINCIPAL_KEY = 
"dfs.secondary.namenode.kerberos.principal";
+  @Deprecated
+  public static final String  DFS_SECONDARY_NAMENODE_USER_NAME_KEY = 
DFS_SECONDARY_NAMENODE_KERBEROS_PRINCIPAL_KEY;
   public static final String  
DFS_SECONDARY_NAMENODE_KERBEROS_INTERNAL_SPNEGO_PRINCIPAL_KEY = 
"dfs.secondary.namenode.kerberos.internal.spnego.principal";
+  @Deprecated
+  public static final String  
DFS_SECONDARY_NAMENODE_INTERNAL_SPNEGO_USER_NAME_KEY = 
DFS_SECONDARY_NAMENODE_KERBEROS_INTERNAL_SPNEGO_PRINCIPAL_KEY;
   public static final String  DFS_NAMENODE_NAME_CACHE_THRESHOLD_KEY = 
"dfs.namenode.name.cache.threshold";
   public static final int     DFS_NAMENODE_NAME_CACHE_THRESHOLD_DEFAULT = 10;
+  public static final String  DFS_NAMENODE_LEGACY_OIV_IMAGE_DIR_KEY = 
"dfs.namenode.legacy-oiv-image.dir";
   
   public static final String  DFS_NAMESERVICES = "dfs.nameservices";
   public static final String  DFS_NAMESERVICE_ID = "dfs.nameservice.id";
@@ -535,6 +565,8 @@ public class DFSConfigKeys extends Commo
   public static final boolean DFS_ENCRYPT_DATA_TRANSFER_DEFAULT = false;
   public static final String DFS_DATA_ENCRYPTION_ALGORITHM_KEY = 
"dfs.encrypt.data.transfer.algorithm";
   public static final String DFS_TRUSTEDCHANNEL_RESOLVER_CLASS = 
"dfs.trustedchannel.resolver.class";
+  public static final String DFS_DATA_TRANSFER_PROTECTION_KEY = 
"dfs.data.transfer.protection";
+  public static final String DFS_DATA_TRANSFER_SASL_PROPS_RESOLVER_CLASS_KEY = 
"dfs.data.transfer.saslproperties.resolver.class";
   
   // Journal-node related configs. These are read on the JN side.
   public static final String  DFS_JOURNALNODE_EDITS_DIR_KEY = 
"dfs.journalnode.edits.dir";
@@ -626,7 +658,17 @@ public class DFSConfigKeys extends Commo
 
   public static final String DFS_DFSCLIENT_HEDGED_READ_THREADPOOL_SIZE =
       "dfs.client.hedged.read.threadpool.size";
-  public static final int DEFAULT_DFSCLIENT_HEDGED_READ_THREADPOOL_SIZE = 0;
-  public static final String DFS_NFS_KEYTAB_FILE_KEY = "dfs.nfs.keytab.file";
-  public static final String DFS_NFS_KERBEROS_PRINCIPAL_KEY = 
"dfs.nfs.kerberos.principal";
+  public static final int     DEFAULT_DFSCLIENT_HEDGED_READ_THREADPOOL_SIZE = 
0;
+
+   // Slow io warning log threshold settings for dfsclient and datanode.
+   public static final String DFS_CLIENT_SLOW_IO_WARNING_THRESHOLD_KEY =
+     "dfs.client.slow.io.warning.threshold.ms";
+   public static final long DFS_CLIENT_SLOW_IO_WARNING_THRESHOLD_DEFAULT = 
30000;
+   public static final String DFS_DATANODE_SLOW_IO_WARNING_THRESHOLD_KEY =
+     "dfs.datanode.slow.io.warning.threshold.ms";
+   public static final long DFS_DATANODE_SLOW_IO_WARNING_THRESHOLD_DEFAULT = 
300;
+
+   public static final String DFS_DATANODE_BLOCK_ID_LAYOUT_UPGRADE_THREADS_KEY 
=
+       "dfs.datanode.block.id.layout.upgrade.threads";
+   public static final int DFS_DATANODE_BLOCK_ID_LAYOUT_UPGRADE_THREADS = 12;
 }

Modified: 
hadoop/common/branches/HADOOP-10388/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSInputStream.java
URL: 
http://svn.apache.org/viewvc/hadoop/common/branches/HADOOP-10388/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSInputStream.java?rev=1619012&r1=1619011&r2=1619012&view=diff
==============================================================================
--- 
hadoop/common/branches/HADOOP-10388/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSInputStream.java
 (original)
+++ 
hadoop/common/branches/HADOOP-10388/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSInputStream.java
 Tue Aug 19 23:49:39 2014
@@ -17,6 +17,7 @@
  */
 package org.apache.hadoop.hdfs;
 
+import java.io.EOFException;
 import java.io.IOException;
 import java.net.InetSocketAddress;
 import java.nio.ByteBuffer;
@@ -32,12 +33,14 @@ import java.util.Map;
 import java.util.Map.Entry;
 import java.util.Set;
 import java.util.concurrent.Callable;
+import java.util.concurrent.CancellationException;
+import java.util.concurrent.CompletionService;
 import java.util.concurrent.ConcurrentHashMap;
-import java.util.concurrent.CountDownLatch;
 import java.util.concurrent.ExecutionException;
+import java.util.concurrent.ExecutorCompletionService;
 import java.util.concurrent.Future;
 import java.util.concurrent.TimeUnit;
-import java.util.concurrent.TimeoutException;
+import java.util.concurrent.atomic.AtomicLong;
 
 import org.apache.commons.io.IOUtils;
 import org.apache.hadoop.classification.InterfaceAudience;
@@ -81,6 +84,7 @@ implements ByteBufferReadable, CanSetDro
     HasEnhancedByteBufferAccess {
   @VisibleForTesting
   public static boolean tcpReadsDisabledForTesting = false;
+  private long hedgedReadOpsLoopNumForTesting = 0;
   private final DFSClient dfsClient;
   private boolean closed = false;
   private final String src;
@@ -389,7 +393,7 @@ implements ByteBufferReadable, CanSetDro
    * Get block at the specified position.
    * Fetch it from the namenode if not cached.
    * 
-   * @param offset
+   * @param offset block corresponding to this offset in file is returned
    * @param updatePosition whether to update current position
    * @return located block
    * @throws IOException
@@ -453,14 +457,13 @@ implements ByteBufferReadable, CanSetDro
    * Get blocks in the specified range.
    * Fetch them from the namenode if not cached. This function
    * will not get a read request beyond the EOF.
-   * @param offset
-   * @param length
+   * @param offset starting offset in file
+   * @param length length of data
    * @return consequent segment of located blocks
    * @throws IOException
    */
-  private synchronized List<LocatedBlock> getBlockRange(long offset, 
-                                                        long length) 
-                                                      throws IOException {
+  private synchronized List<LocatedBlock> getBlockRange(long offset,
+      long length)  throws IOException {
     // getFileLength(): returns total file length
     // locatedBlocks.getFileLength(): returns length of completed blocks
     if (offset >= getFileLength()) {
@@ -847,7 +850,6 @@ implements ByteBufferReadable, CanSetDro
 
   /**
    * Add corrupted block replica into map.
-   * @param corruptedBlockMap 
    */
   private void addIntoCorruptedBlockMap(ExtendedBlock blk, DatanodeInfo node, 
       Map<ExtendedBlock, Set<DatanodeInfo>> corruptedBlockMap) {
@@ -978,20 +980,15 @@ implements ByteBufferReadable, CanSetDro
   private Callable<ByteBuffer> getFromOneDataNode(final DNAddrPair datanode,
       final LocatedBlock block, final long start, final long end,
       final ByteBuffer bb,
-      final Map<ExtendedBlock, Set<DatanodeInfo>> corruptedBlockMap,
-      final CountDownLatch latch) {
+      final Map<ExtendedBlock, Set<DatanodeInfo>> corruptedBlockMap) {
     return new Callable<ByteBuffer>() {
       @Override
       public ByteBuffer call() throws Exception {
-        try {
-          byte[] buf = bb.array();
-          int offset = bb.position();
-          actualGetFromOneDataNode(datanode, block, start, end, buf, offset,
-              corruptedBlockMap);
-          return bb;
-        } finally {
-          latch.countDown();
-        }
+        byte[] buf = bb.array();
+        int offset = bb.position();
+        actualGetFromOneDataNode(datanode, block, start, end, buf, offset,
+            corruptedBlockMap);
+        return bb;
       }
     };
   }
@@ -1020,6 +1017,7 @@ implements ByteBufferReadable, CanSetDro
       BlockReader reader = null;
 
       try {
+        DFSClientFaultInjector.get().fetchFromDatanodeException();
         Token<BlockTokenIdentifier> blockToken = block.getBlockToken();
         int len = (int) (end - start + 1);
         reader = new BlockReaderFactory(dfsClient.getConf()).
@@ -1040,10 +1038,13 @@ implements ByteBufferReadable, CanSetDro
             setConfiguration(dfsClient.getConfiguration()).
             build();
         int nread = reader.readAll(buf, offset, len);
+        updateReadStatistics(readStatistics, nread, reader);
+
         if (nread != len) {
           throw new IOException("truncated return from reader.read(): " +
                                 "excpected " + len + ", got " + nread);
         }
+        DFSClientFaultInjector.get().readFromDatanodeDelay();
         return;
       } catch (ChecksumException e) {
         String msg = "fetchBlockByteRange(). Got a checksum exception for "
@@ -1091,49 +1092,49 @@ implements ByteBufferReadable, CanSetDro
    * int, Map)} except we start up a second, parallel, 'hedged' read
    * if the first read is taking longer than configured amount of
    * time.  We then wait on which ever read returns first.
-   * 
-   * @param block
-   * @param start
-   * @param end
-   * @param buf
-   * @param offset
-   * @param corruptedBlockMap
-   * @throws IOException
    */
   private void hedgedFetchBlockByteRange(LocatedBlock block, long start,
       long end, byte[] buf, int offset,
       Map<ExtendedBlock, Set<DatanodeInfo>> corruptedBlockMap)
       throws IOException {
     ArrayList<Future<ByteBuffer>> futures = new 
ArrayList<Future<ByteBuffer>>();
+    CompletionService<ByteBuffer> hedgedService =
+        new ExecutorCompletionService<ByteBuffer>(
+        dfsClient.getHedgedReadsThreadPool());
     ArrayList<DatanodeInfo> ignored = new ArrayList<DatanodeInfo>();
     ByteBuffer bb = null;
     int len = (int) (end - start + 1);
     block = getBlockAt(block.getStartOffset(), false);
-    // Latch shared by all outstanding reads.  First to finish closes
-    CountDownLatch hasReceivedResult = new CountDownLatch(1);
     while (true) {
+      // see HDFS-6591, this metric is used to verify/catch unnecessary loops
+      hedgedReadOpsLoopNumForTesting++;
       DNAddrPair chosenNode = null;
-      Future<ByteBuffer> future = null;
-      // futures is null if there is no request already executing.
+      // there is no request already executing.
       if (futures.isEmpty()) {
-        // chooseDataNode is a commitment.  If no node, we go to
-        // the NN to reget block locations.  Only go here on first read.
+        // chooseDataNode is a commitment. If no node, we go to
+        // the NN to reget block locations. Only go here on first read.
         chosenNode = chooseDataNode(block, ignored);
         bb = ByteBuffer.wrap(buf, offset, len);
-        future = getHedgedReadFuture(chosenNode, block, start, end, bb,
-          corruptedBlockMap, hasReceivedResult);
+        Callable<ByteBuffer> getFromDataNodeCallable = getFromOneDataNode(
+            chosenNode, block, start, end, bb, corruptedBlockMap);
+        Future<ByteBuffer> firstRequest = hedgedService
+            .submit(getFromDataNodeCallable);
+        futures.add(firstRequest);
         try {
-          future.get(dfsClient.getHedgedReadTimeout(), TimeUnit.MILLISECONDS);
-          return;
-        } catch (TimeoutException e) {
+          Future<ByteBuffer> future = hedgedService.poll(
+              dfsClient.getHedgedReadTimeout(), TimeUnit.MILLISECONDS);
+          if (future != null) {
+            future.get();
+            return;
+          }
           if (DFSClient.LOG.isDebugEnabled()) {
-            DFSClient.LOG.debug("Waited " + dfsClient.getHedgedReadTimeout() +
-              "ms to read from " + chosenNode.info + "; spawning hedged read");
+            DFSClient.LOG.debug("Waited " + dfsClient.getHedgedReadTimeout()
+                + "ms to read from " + chosenNode.info
+                + "; spawning hedged read");
           }
           // Ignore this node on next go around.
           ignored.add(chosenNode.info);
           dfsClient.getHedgedReadMetrics().incHedgedReadOps();
-          futures.add(future);
           continue; // no need to refresh block locations
         } catch (InterruptedException e) {
           // Ignore
@@ -1141,25 +1142,31 @@ implements ByteBufferReadable, CanSetDro
           // Ignore already logged in the call.
         }
       } else {
-        // We are starting up a 'hedged' read.  We have a read already
+        // We are starting up a 'hedged' read. We have a read already
         // ongoing. Call getBestNodeDNAddrPair instead of chooseDataNode.
         // If no nodes to do hedged reads against, pass.
         try {
-          chosenNode = getBestNodeDNAddrPair(block.getLocations(), ignored);
+          try {
+            chosenNode = getBestNodeDNAddrPair(block.getLocations(), ignored);
+          } catch (IOException ioe) {
+            chosenNode = chooseDataNode(block, ignored);
+          }
           bb = ByteBuffer.allocate(len);
-          future = getHedgedReadFuture(chosenNode, block, start, end, bb,
-            corruptedBlockMap, hasReceivedResult);
-          futures.add(future);
+          Callable<ByteBuffer> getFromDataNodeCallable = getFromOneDataNode(
+              chosenNode, block, start, end, bb, corruptedBlockMap);
+          Future<ByteBuffer> oneMoreRequest = hedgedService
+              .submit(getFromDataNodeCallable);
+          futures.add(oneMoreRequest);
         } catch (IOException ioe) {
           if (DFSClient.LOG.isDebugEnabled()) {
-            DFSClient.LOG.debug("Failed getting node for hedged read: " +
-              ioe.getMessage());
+            DFSClient.LOG.debug("Failed getting node for hedged read: "
+                + ioe.getMessage());
           }
         }
         // if not succeeded. Submit callables for each datanode in a loop, wait
         // for a fixed interval and get the result from the fastest one.
         try {
-          ByteBuffer result = getFirstToComplete(futures, hasReceivedResult);
+          ByteBuffer result = getFirstToComplete(hedgedService, futures);
           // cancel the rest.
           cancelAll(futures);
           if (result.array() != buf) { // compare the array pointers
@@ -1171,50 +1178,43 @@ implements ByteBufferReadable, CanSetDro
           }
           return;
         } catch (InterruptedException ie) {
-          // Ignore
-        } catch (ExecutionException e) {
-          // exception already handled in the call method. getFirstToComplete
-          // will remove the failing future from the list. nothing more to do.
+          // Ignore and retry
         }
-        // We got here if exception.  Ignore this node on next go around IFF
+        // We got here if exception. Ignore this node on next go around IFF
         // we found a chosenNode to hedge read against.
         if (chosenNode != null && chosenNode.info != null) {
           ignored.add(chosenNode.info);
         }
       }
-      // executed if we get an error from a data node
-      block = getBlockAt(block.getStartOffset(), false);
     }
   }
 
-  private Future<ByteBuffer> getHedgedReadFuture(final DNAddrPair chosenNode,
-      final LocatedBlock block, long start,
-      final long end, final ByteBuffer bb,
-      final Map<ExtendedBlock, Set<DatanodeInfo>> corruptedBlockMap,
-      final CountDownLatch hasReceivedResult) {
-    Callable<ByteBuffer> getFromDataNodeCallable =
-        getFromOneDataNode(chosenNode, block, start, end, bb,
-          corruptedBlockMap, hasReceivedResult);
-    return 
dfsClient.getHedgedReadsThreadPool().submit(getFromDataNodeCallable);
+  @VisibleForTesting
+  public long getHedgedReadOpsLoopNumForTesting() {
+    return hedgedReadOpsLoopNumForTesting;
   }
 
-  private ByteBuffer getFirstToComplete(ArrayList<Future<ByteBuffer>> futures,
-      CountDownLatch latch) throws ExecutionException, InterruptedException {
-    latch.await();
-    for (Future<ByteBuffer> future : futures) {
-      if (future.isDone()) {
-        try {
-          return future.get();
-        } catch (ExecutionException e) {
-          // already logged in the Callable
-          futures.remove(future);
-          throw e;
-        }
-      }
+  private ByteBuffer getFirstToComplete(
+      CompletionService<ByteBuffer> hedgedService,
+      ArrayList<Future<ByteBuffer>> futures) throws InterruptedException {
+    if (futures.isEmpty()) {
+      throw new InterruptedException("let's retry");
     }
-    throw new InterruptedException("latch has counted down to zero but no"
-        + "result available yet, for safety try to request another one from"
-        + "outside loop, this should be rare");
+    Future<ByteBuffer> future = null;
+    try {
+      future = hedgedService.take();
+      ByteBuffer bb = future.get();
+      futures.remove(future);
+      return bb;
+    } catch (ExecutionException e) {
+      // already logged in the Callable
+      futures.remove(future);
+    } catch (CancellationException ce) {
+      // already logged in the Callable
+      futures.remove(future);
+    }
+
+    throw new InterruptedException("let's retry");
   }
 
   private void cancelAll(List<Future<ByteBuffer>> futures) {
@@ -1375,10 +1375,10 @@ implements ByteBufferReadable, CanSetDro
   @Override
   public synchronized void seek(long targetPos) throws IOException {
     if (targetPos > getFileLength()) {
-      throw new IOException("Cannot seek after EOF");
+      throw new EOFException("Cannot seek after EOF");
     }
     if (targetPos < 0) {
-      throw new IOException("Cannot seek to negative offset");
+      throw new EOFException("Cannot seek to negative offset");
     }
     if (closed) {
       throw new IOException("Stream is closed!");

Modified: 
hadoop/common/branches/HADOOP-10388/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSOutputStream.java
URL: 
http://svn.apache.org/viewvc/hadoop/common/branches/HADOOP-10388/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSOutputStream.java?rev=1619012&r1=1619011&r2=1619012&view=diff
==============================================================================
--- 
hadoop/common/branches/HADOOP-10388/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSOutputStream.java
 (original)
+++ 
hadoop/common/branches/HADOOP-10388/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSOutputStream.java
 Tue Aug 19 23:49:39 2014
@@ -61,7 +61,6 @@ import org.apache.hadoop.hdfs.protocol.N
 import org.apache.hadoop.hdfs.protocol.SnapshotAccessControlException;
 import org.apache.hadoop.hdfs.protocol.UnresolvedPathException;
 import org.apache.hadoop.hdfs.protocol.datatransfer.BlockConstructionStage;
-import org.apache.hadoop.hdfs.protocol.datatransfer.DataTransferEncryptor;
 import org.apache.hadoop.hdfs.protocol.datatransfer.DataTransferProtocol;
 import org.apache.hadoop.hdfs.protocol.datatransfer.IOStreamPair;
 import 
org.apache.hadoop.hdfs.protocol.datatransfer.InvalidEncryptionKeyException;
@@ -122,6 +121,7 @@ public class DFSOutputStream extends FSO
     implements Syncable, CanSetDropBehind {
   private static final int MAX_PACKETS = 80; // each packet 64K, total 5MB
   private final DFSClient dfsClient;
+  private final long dfsclientSlowLogThresholdMs;
   private Socket s;
   // closed is accessed by different threads under different locks.
   private volatile boolean closed = false;
@@ -313,6 +313,7 @@ public class DFSOutputStream extends FSO
     private DataInputStream blockReplyStream;
     private ResponseProcessor response = null;
     private volatile DatanodeInfo[] nodes = null; // list of targets for 
current block
+    private volatile StorageType[] storageTypes = null;
     private volatile String[] storageIDs = null;
     private final LoadingCache<DatanodeInfo, DatanodeInfo> excludedNodes =
         CacheBuilder.newBuilder()
@@ -417,10 +418,12 @@ public class DFSOutputStream extends FSO
     }
     
     private void setPipeline(LocatedBlock lb) {
-      setPipeline(lb.getLocations(), lb.getStorageIDs());
+      setPipeline(lb.getLocations(), lb.getStorageTypes(), lb.getStorageIDs());
     }
-    private void setPipeline(DatanodeInfo[] nodes, String[] storageIDs) {
+    private void setPipeline(DatanodeInfo[] nodes, StorageType[] storageTypes,
+        String[] storageIDs) {
       this.nodes = nodes;
+      this.storageTypes = storageTypes;
       this.storageIDs = storageIDs;
     }
 
@@ -446,7 +449,7 @@ public class DFSOutputStream extends FSO
       this.setName("DataStreamer for file " + src);
       closeResponder();
       closeStream();
-      setPipeline(null, null);
+      setPipeline(null, null, null);
       stage = BlockConstructionStage.PIPELINE_SETUP_CREATE;
     }
     
@@ -788,11 +791,19 @@ public class DFSOutputStream extends FSO
           // process responses from datanodes.
           try {
             // read an ack from the pipeline
+            long begin = Time.monotonicNow();
             ack.readFields(blockReplyStream);
-            if (DFSClient.LOG.isDebugEnabled()) {
+            long duration = Time.monotonicNow() - begin;
+            if (duration > dfsclientSlowLogThresholdMs
+                && ack.getSeqno() != Packet.HEART_BEAT_SEQNO) {
+              DFSClient.LOG
+                  .warn("Slow ReadProcessor read fields took " + duration
+                      + "ms (threshold=" + dfsclientSlowLogThresholdMs + "ms); 
ack: "
+                      + ack + ", targets: " + Arrays.asList(targets));
+            } else if (DFSClient.LOG.isDebugEnabled()) {
               DFSClient.LOG.debug("DFSClient " + ack);
             }
-            
+
             long seqno = ack.getSeqno();
             // processes response status from datanodes.
             for (int i = ack.getNumOfReplies()-1; i >=0  && 
dfsClient.clientRunning; i--) {
@@ -1012,7 +1023,7 @@ public class DFSOutputStream extends FSO
       //get a new datanode
       final DatanodeInfo[] original = nodes;
       final LocatedBlock lb = dfsClient.namenode.getAdditionalDatanode(
-          src, block, nodes, storageIDs,
+          src, fileId, block, nodes, storageIDs,
           failed.toArray(new DatanodeInfo[failed.size()]),
           1, dfsClient.clientName);
       setPipeline(lb);
@@ -1023,10 +1034,12 @@ public class DFSOutputStream extends FSO
       //transfer replica
       final DatanodeInfo src = d == 0? nodes[1]: nodes[d - 1];
       final DatanodeInfo[] targets = {nodes[d]};
-      transfer(src, targets, lb.getBlockToken());
+      final StorageType[] targetStorageTypes = {storageTypes[d]};
+      transfer(src, targets, targetStorageTypes, lb.getBlockToken());
     }
 
     private void transfer(final DatanodeInfo src, final DatanodeInfo[] targets,
+        final StorageType[] targetStorageTypes,
         final Token<BlockTokenIdentifier> blockToken) throws IOException {
       //transfer replica to the new datanode
       Socket sock = null;
@@ -1038,21 +1051,17 @@ public class DFSOutputStream extends FSO
         
         OutputStream unbufOut = NetUtils.getOutputStream(sock, writeTimeout);
         InputStream unbufIn = NetUtils.getInputStream(sock);
-        if (dfsClient.shouldEncryptData() && 
-            
!dfsClient.trustedChannelResolver.isTrusted(sock.getInetAddress())) {
-          IOStreamPair encryptedStreams =
-              DataTransferEncryptor.getEncryptedStreams(
-                  unbufOut, unbufIn, dfsClient.getDataEncryptionKey());
-          unbufOut = encryptedStreams.out;
-          unbufIn = encryptedStreams.in;
-        }
+        IOStreamPair saslStreams = dfsClient.saslClient.socketSend(sock,
+          unbufOut, unbufIn, dfsClient, blockToken, src);
+        unbufOut = saslStreams.out;
+        unbufIn = saslStreams.in;
         out = new DataOutputStream(new BufferedOutputStream(unbufOut,
             HdfsConstants.SMALL_BUFFER_SIZE));
         in = new DataInputStream(unbufIn);
 
         //send the TRANSFER_BLOCK request
         new Sender(out).transferBlock(block, blockToken, dfsClient.clientName,
-            targets);
+            targets, targetStorageTypes);
         out.flush();
 
         //ack
@@ -1131,16 +1140,15 @@ public class DFSOutputStream extends FSO
           failed.add(nodes[errorIndex]);
 
           DatanodeInfo[] newnodes = new DatanodeInfo[nodes.length-1];
-          System.arraycopy(nodes, 0, newnodes, 0, errorIndex);
-          System.arraycopy(nodes, errorIndex+1, newnodes, errorIndex,
-              newnodes.length-errorIndex);
+          arraycopy(nodes, newnodes, errorIndex);
+
+          final StorageType[] newStorageTypes = new 
StorageType[newnodes.length];
+          arraycopy(storageTypes, newStorageTypes, errorIndex);
 
           final String[] newStorageIDs = new String[newnodes.length];
-          System.arraycopy(storageIDs, 0, newStorageIDs, 0, errorIndex);
-          System.arraycopy(storageIDs, errorIndex+1, newStorageIDs, errorIndex,
-              newStorageIDs.length-errorIndex);
+          arraycopy(storageIDs, newStorageIDs, errorIndex);
           
-          setPipeline(newnodes, newStorageIDs);
+          setPipeline(newnodes, newStorageTypes, newStorageIDs);
 
           // Just took care of a node error while waiting for a node restart
           if (restartingNodeIndex >= 0) {
@@ -1177,7 +1185,7 @@ public class DFSOutputStream extends FSO
         
         // set up the pipeline again with the remaining nodes
         if (failPacket) { // for testing
-          success = createBlockOutputStream(nodes, newGS, isRecovery);
+          success = createBlockOutputStream(nodes, storageTypes, newGS, 
isRecovery);
           failPacket = false;
           try {
             // Give DNs time to send in bad reports. In real situations,
@@ -1186,7 +1194,7 @@ public class DFSOutputStream extends FSO
             Thread.sleep(2000);
           } catch (InterruptedException ie) {}
         } else {
-          success = createBlockOutputStream(nodes, newGS, isRecovery);
+          success = createBlockOutputStream(nodes, storageTypes, newGS, 
isRecovery);
         }
 
         if (restartingNodeIndex >= 0) {
@@ -1238,6 +1246,7 @@ public class DFSOutputStream extends FSO
     private LocatedBlock nextBlockOutputStream() throws IOException {
       LocatedBlock lb = null;
       DatanodeInfo[] nodes = null;
+      StorageType[] storageTypes = null;
       int count = dfsClient.getConf().nBlockWriteRetry;
       boolean success = false;
       ExtendedBlock oldBlock = block;
@@ -1260,15 +1269,17 @@ public class DFSOutputStream extends FSO
         bytesSent = 0;
         accessToken = lb.getBlockToken();
         nodes = lb.getLocations();
+        storageTypes = lb.getStorageTypes();
 
         //
         // Connect to first DataNode in the list.
         //
-        success = createBlockOutputStream(nodes, 0L, false);
+        success = createBlockOutputStream(nodes, storageTypes, 0L, false);
 
         if (!success) {
           DFSClient.LOG.info("Abandoning " + block);
-          dfsClient.namenode.abandonBlock(block, src, dfsClient.clientName);
+          dfsClient.namenode.abandonBlock(block, fileId, src,
+              dfsClient.clientName);
           block = null;
           DFSClient.LOG.info("Excluding datanode " + nodes[errorIndex]);
           excludedNodes.put(nodes[errorIndex], nodes[errorIndex]);
@@ -1284,8 +1295,8 @@ public class DFSOutputStream extends FSO
     // connects to the first datanode in the pipeline
     // Returns true if success, otherwise return failure.
     //
-    private boolean createBlockOutputStream(DatanodeInfo[] nodes, long newGS,
-        boolean recoveryFlag) {
+    private boolean createBlockOutputStream(DatanodeInfo[] nodes,
+        StorageType[] nodeStorageTypes, long newGS, boolean recoveryFlag) {
       if (nodes.length == 0) {
         DFSClient.LOG.info("nodes are empty for write pipeline of block "
             + block);
@@ -1315,14 +1326,10 @@ public class DFSOutputStream extends FSO
           
           OutputStream unbufOut = NetUtils.getOutputStream(s, writeTimeout);
           InputStream unbufIn = NetUtils.getInputStream(s);
-          if (dfsClient.shouldEncryptData()  && 
-              !dfsClient.trustedChannelResolver.isTrusted(s.getInetAddress())) 
{
-            IOStreamPair encryptedStreams =
-                DataTransferEncryptor.getEncryptedStreams(unbufOut,
-                    unbufIn, dfsClient.getDataEncryptionKey());
-            unbufOut = encryptedStreams.out;
-            unbufIn = encryptedStreams.in;
-          }
+          IOStreamPair saslStreams = dfsClient.saslClient.socketSend(s,
+            unbufOut, unbufIn, dfsClient, accessToken, nodes[0]);
+          unbufOut = saslStreams.out;
+          unbufIn = saslStreams.in;
           out = new DataOutputStream(new BufferedOutputStream(unbufOut,
               HdfsConstants.SMALL_BUFFER_SIZE));
           blockReplyStream = new DataInputStream(unbufIn);
@@ -1331,9 +1338,10 @@ public class DFSOutputStream extends FSO
           // Xmit header info to datanode
           //
   
+          BlockConstructionStage bcs = recoveryFlag? stage.getRecoveryStage(): 
stage;
           // send the request
-          new Sender(out).writeBlock(block, accessToken, dfsClient.clientName,
-              nodes, null, recoveryFlag? stage.getRecoveryStage() : stage, 
+          new Sender(out).writeBlock(block, nodeStorageTypes[0], accessToken,
+              dfsClient.clientName, nodes, nodeStorageTypes, null, bcs, 
               nodes.length, block.getNumBytes(), bytesSent, newGS, checksum,
               cachingStrategy.get());
   
@@ -1569,6 +1577,8 @@ public class DFSOutputStream extends FSO
                             
     }
     this.checksum = checksum;
+    this.dfsclientSlowLogThresholdMs =
+      dfsClient.getConf().dfsclientSlowIoWarningThresholdMs;
   }
 
   /** Construct a new output stream for creating a file. */
@@ -1914,7 +1924,8 @@ public class DFSOutputStream extends FSO
       // namenode.
       if (persistBlocks.getAndSet(false) || updateLength) {
         try {
-          dfsClient.namenode.fsync(src, dfsClient.clientName, lastBlockLength);
+          dfsClient.namenode.fsync(src, fileId,
+              dfsClient.clientName, lastBlockLength);
         } catch (IOException ioe) {
           DFSClient.LOG.warn("Unable to persist blocks in hflush for " + src, 
ioe);
           // If we got an error here, it might be because some other thread 
called
@@ -1999,6 +2010,7 @@ public class DFSOutputStream extends FSO
     if (DFSClient.LOG.isDebugEnabled()) {
       DFSClient.LOG.debug("Waiting for ack for: " + seqno);
     }
+    long begin = Time.monotonicNow();
     try {
       synchronized (dataQueue) {
         while (!closed) {
@@ -2018,6 +2030,11 @@ public class DFSOutputStream extends FSO
       checkClosed();
     } catch (ClosedChannelException e) {
     }
+    long duration = Time.monotonicNow() - begin;
+    if (duration > dfsclientSlowLogThresholdMs) {
+      DFSClient.LOG.warn("Slow waitForAckedSeqno took " + duration
+          + "ms (threshold=" + dfsclientSlowLogThresholdMs + "ms)");
+    }
   }
 
   private synchronized void start() {
@@ -2035,7 +2052,7 @@ public class DFSOutputStream extends FSO
     streamer.setLastException(new IOException("Lease timeout of "
         + (dfsClient.getHdfsTimeout()/1000) + " seconds expired."));
     closeThreads(true);
-    dfsClient.endFileLease(src);
+    dfsClient.endFileLease(fileId);
   }
 
   // shutdown datastreamer and responseprocessor threads.
@@ -2090,7 +2107,7 @@ public class DFSOutputStream extends FSO
       ExtendedBlock lastBlock = streamer.getBlock();
       closeThreads(false);
       completeFile(lastBlock);
-      dfsClient.endFileLease(src);
+      dfsClient.endFileLease(fileId);
     } catch (ClosedChannelException e) {
     } finally {
       closed = true;
@@ -2119,12 +2136,12 @@ public class DFSOutputStream extends FSO
             throw new IOException(msg);
         }
         try {
-          Thread.sleep(localTimeout);
           if (retries == 0) {
             throw new IOException("Unable to close file because the last block"
                 + " does not have enough number of replicas.");
           }
           retries--;
+          Thread.sleep(localTimeout);
           localTimeout *= 2;
           if (Time.now() - localstart > 5000) {
             DFSClient.LOG.info("Could not complete " + src + " retrying...");
@@ -2184,7 +2201,12 @@ public class DFSOutputStream extends FSO
   }
 
   @VisibleForTesting
-  long getFileId() {
+  public long getFileId() {
     return fileId;
   }
+
+  private static <T> void arraycopy(T[] srcs, T[] dsts, int skipIndex) {
+    System.arraycopy(srcs, 0, dsts, 0, skipIndex);
+    System.arraycopy(srcs, skipIndex+1, dsts, skipIndex, 
dsts.length-skipIndex);
+  }
 }

Modified: 
hadoop/common/branches/HADOOP-10388/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSUtil.java
URL: 
http://svn.apache.org/viewvc/hadoop/common/branches/HADOOP-10388/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSUtil.java?rev=1619012&r1=1619011&r2=1619012&view=diff
==============================================================================
--- 
hadoop/common/branches/HADOOP-10388/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSUtil.java
 (original)
+++ 
hadoop/common/branches/HADOOP-10388/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSUtil.java
 Tue Aug 19 23:49:39 2014
@@ -33,6 +33,9 @@ import static org.apache.hadoop.hdfs.DFS
 import static 
org.apache.hadoop.hdfs.DFSConfigKeys.DFS_NAMENODE_SERVICE_RPC_ADDRESS_KEY;
 import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_NAMESERVICES;
 import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_NAMESERVICE_ID;
+import static 
org.apache.hadoop.hdfs.DFSConfigKeys.DFS_SERVER_HTTPS_KEYPASSWORD_KEY;
+import static 
org.apache.hadoop.hdfs.DFSConfigKeys.DFS_SERVER_HTTPS_KEYSTORE_PASSWORD_KEY;
+import static 
org.apache.hadoop.hdfs.DFSConfigKeys.DFS_SERVER_HTTPS_TRUSTSTORE_PASSWORD_KEY;
 
 import java.io.IOException;
 import java.io.PrintStream;
@@ -288,9 +291,7 @@ public class DFSUtil {
    * <p>
    * Note that some components are only reserved under certain directories, 
e.g.
    * "/.reserved" is reserved, while "/hadoop/.reserved" is not.
-   * 
-   * @param component
-   * @return if the component is reserved
+   * @return true, if the component is reserved
    */
   public static boolean isReservedPathComponent(String component) {
     for (String reserved : HdfsConstants.RESERVED_PATH_COMPONENTS) {
@@ -1015,8 +1016,8 @@ public class DFSUtil {
   /**
    * return server http or https address from the configuration for a
    * given namenode rpc address.
-   * @param conf
    * @param namenodeAddr - namenode RPC address
+   * @param conf configuration
    * @param scheme - the scheme (http / https)
    * @return server http or https address
    * @throws IOException 
@@ -1327,7 +1328,7 @@ public class DFSUtil {
   /**
    * For given set of {@code keys} adds nameservice Id and or namenode Id
    * and returns {nameserviceId, namenodeId} when address match is found.
-   * @see #getSuffixIDs(Configuration, String, AddressMatcher)
+   * @see #getSuffixIDs(Configuration, String, String, String, AddressMatcher)
    */
   static String[] getSuffixIDs(final Configuration conf,
       final InetSocketAddress address, final String... keys) {
@@ -1499,9 +1500,8 @@ public class DFSUtil {
   /**
    * Get SPNEGO keytab Key from configuration
    * 
-   * @param conf
-   *          Configuration
-   * @param defaultKey
+   * @param conf Configuration
+   * @param defaultKey default key to be used for config lookup
    * @return DFS_WEB_AUTHENTICATION_KERBEROS_KEYTAB_KEY if the key is not empty
    *         else return defaultKey
    */
@@ -1534,16 +1534,38 @@ public class DFSUtil {
         .needsClientAuth(
             sslConf.getBoolean(DFS_CLIENT_HTTPS_NEED_AUTH_KEY,
                 DFS_CLIENT_HTTPS_NEED_AUTH_DEFAULT))
-        .keyPassword(sslConf.get("ssl.server.keystore.keypassword"))
+        .keyPassword(getPassword(sslConf, DFS_SERVER_HTTPS_KEYPASSWORD_KEY))
         .keyStore(sslConf.get("ssl.server.keystore.location"),
-            sslConf.get("ssl.server.keystore.password"),
+            getPassword(sslConf, DFS_SERVER_HTTPS_KEYSTORE_PASSWORD_KEY),
             sslConf.get("ssl.server.keystore.type", "jks"))
         .trustStore(sslConf.get("ssl.server.truststore.location"),
-            sslConf.get("ssl.server.truststore.password"),
+            getPassword(sslConf, DFS_SERVER_HTTPS_TRUSTSTORE_PASSWORD_KEY),
             sslConf.get("ssl.server.truststore.type", "jks"));
   }
 
   /**
+   * Leverages the Configuration.getPassword method to attempt to get
+   * passwords from the CredentialProvider API before falling back to
+   * clear text in config - if falling back is allowed.
+   * @param conf Configuration instance
+   * @param alias name of the credential to retreive
+   * @return String credential value or null
+   */
+  static String getPassword(Configuration conf, String alias) {
+    String password = null;
+    try {
+      char[] passchars = conf.getPassword(alias);
+      if (passchars != null) {
+        password = new String(passchars);
+      }
+    }
+    catch (IOException ioe) {
+      password = null;
+    }
+    return password;
+  }
+
+  /**
    * Converts a Date into an ISO-8601 formatted datetime string.
    */
   public static String dateToIso8601String(Date date) {
@@ -1646,9 +1668,11 @@ public class DFSUtil {
         .setKeytabConfKey(getSpnegoKeytabKey(conf, spnegoKeytabFileKey));
 
     // initialize the webserver for uploading/downloading files.
-    LOG.info("Starting web server as: "
-        + SecurityUtil.getServerPrincipal(conf.get(spnegoUserNameKey),
-            httpAddr.getHostName()));
+    if (UserGroupInformation.isSecurityEnabled()) {
+      LOG.info("Starting web server as: "
+          + SecurityUtil.getServerPrincipal(conf.get(spnegoUserNameKey),
+              httpAddr.getHostName()));
+    }
 
     if (policy.isHttpEnabled()) {
       if (httpAddr.getPort() == 0) {

Modified: 
hadoop/common/branches/HADOOP-10388/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DistributedFileSystem.java
URL: 
http://svn.apache.org/viewvc/hadoop/common/branches/HADOOP-10388/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DistributedFileSystem.java?rev=1619012&r1=1619011&r2=1619012&view=diff
==============================================================================
--- 
hadoop/common/branches/HADOOP-10388/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DistributedFileSystem.java
 (original)
+++ 
hadoop/common/branches/HADOOP-10388/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DistributedFileSystem.java
 Tue Aug 19 23:49:39 2014
@@ -25,6 +25,7 @@ import java.net.URI;
 import java.util.ArrayList;
 import java.util.EnumSet;
 import java.util.List;
+import java.util.Map;
 
 import org.apache.hadoop.classification.InterfaceAudience;
 import org.apache.hadoop.classification.InterfaceStability;
@@ -46,6 +47,7 @@ import org.apache.hadoop.fs.FsServerDefa
 import org.apache.hadoop.fs.FsStatus;
 import org.apache.hadoop.fs.LocatedFileStatus;
 import org.apache.hadoop.fs.Options;
+import org.apache.hadoop.fs.XAttrSetFlag;
 import org.apache.hadoop.fs.Options.ChecksumOpt;
 import org.apache.hadoop.fs.ParentNotDirectoryException;
 import org.apache.hadoop.fs.Path;
@@ -57,6 +59,7 @@ import org.apache.hadoop.fs.VolumeId;
 import org.apache.hadoop.fs.permission.AclEntry;
 import org.apache.hadoop.fs.permission.AclStatus;
 import org.apache.hadoop.fs.permission.FsPermission;
+import org.apache.hadoop.fs.permission.FsAction;
 import org.apache.hadoop.hdfs.client.HdfsAdmin;
 import org.apache.hadoop.hdfs.client.HdfsDataInputStream;
 import org.apache.hadoop.hdfs.client.HdfsDataOutputStream;
@@ -66,14 +69,12 @@ import org.apache.hadoop.hdfs.protocol.C
 import org.apache.hadoop.hdfs.protocol.CachePoolInfo;
 import org.apache.hadoop.hdfs.protocol.DatanodeInfo;
 import org.apache.hadoop.hdfs.protocol.DirectoryListing;
-import org.apache.hadoop.hdfs.protocol.ExtendedBlock;
 import org.apache.hadoop.hdfs.protocol.HdfsConstants;
 import org.apache.hadoop.hdfs.protocol.HdfsConstants.DatanodeReportType;
 import org.apache.hadoop.hdfs.protocol.HdfsConstants.RollingUpgradeAction;
 import org.apache.hadoop.hdfs.protocol.HdfsConstants.SafeModeAction;
 import org.apache.hadoop.hdfs.protocol.HdfsFileStatus;
 import org.apache.hadoop.hdfs.protocol.HdfsLocatedFileStatus;
-import org.apache.hadoop.hdfs.protocol.LocatedBlock;
 import org.apache.hadoop.hdfs.protocol.RollingUpgradeInfo;
 import org.apache.hadoop.hdfs.protocol.SnapshotDiffReport;
 import org.apache.hadoop.hdfs.protocol.SnapshottableDirectoryStatus;
@@ -83,7 +84,6 @@ import org.apache.hadoop.hdfs.server.nam
 import org.apache.hadoop.io.Text;
 import org.apache.hadoop.net.NetUtils;
 import org.apache.hadoop.security.AccessControlException;
-import org.apache.hadoop.security.token.SecretManager.InvalidToken;
 import org.apache.hadoop.security.token.Token;
 import org.apache.hadoop.util.Progressable;
 
@@ -1140,7 +1140,7 @@ public class DistributedFileSystem exten
       @Override
       public FileChecksum doCall(final Path p)
           throws IOException, UnresolvedLinkException {
-        return dfs.getFileChecksum(getPathName(p));
+        return dfs.getFileChecksum(getPathName(p), Long.MAX_VALUE);
       }
 
       @Override
@@ -1152,6 +1152,32 @@ public class DistributedFileSystem exten
   }
 
   @Override
+  public FileChecksum getFileChecksum(Path f, final long length)
+      throws IOException {
+    statistics.incrementReadOps(1);
+    Path absF = fixRelativePart(f);
+    return new FileSystemLinkResolver<FileChecksum>() {
+      @Override
+      public FileChecksum doCall(final Path p)
+          throws IOException, UnresolvedLinkException {
+        return dfs.getFileChecksum(getPathName(p), length);
+      }
+
+      @Override
+      public FileChecksum next(final FileSystem fs, final Path p)
+          throws IOException {
+        if (fs instanceof DistributedFileSystem) {
+          return ((DistributedFileSystem) fs).getFileChecksum(p, length);
+        } else {
+          throw new UnsupportedFileSystemException(
+              "getFileChecksum(Path, long) is not supported by "
+                  + fs.getClass().getSimpleName()); 
+        }
+      }
+    }.resolve(this, absF);
+  }
+
+  @Override
   public void setPermission(Path p, final FsPermission permission
       ) throws IOException {
     statistics.incrementWriteOps(1);
@@ -1429,7 +1455,7 @@ public class DistributedFileSystem exten
    * Get the difference between two snapshots, or between a snapshot and the
    * current tree of a directory.
    * 
-   * @see DFSClient#getSnapshotDiffReport(Path, String, String)
+   * @see DFSClient#getSnapshotDiffReport(String, String, String)
    */
   public SnapshotDiffReport getSnapshotDiffReport(final Path snapshotDir,
       final String fromSnapshot, final String toSnapshot) throws IOException {
@@ -1769,4 +1795,127 @@ public class DistributedFileSystem exten
       }
     }.resolve(this, absF);
   }
+  
+  @Override
+  public void setXAttr(Path path, final String name, final byte[] value, 
+      final EnumSet<XAttrSetFlag> flag) throws IOException {
+    Path absF = fixRelativePart(path);
+    new FileSystemLinkResolver<Void>() {
+
+      @Override
+      public Void doCall(final Path p) throws IOException {
+        dfs.setXAttr(getPathName(p), name, value, flag);
+        return null;
+      }
+
+      @Override
+      public Void next(final FileSystem fs, final Path p) throws IOException {
+        fs.setXAttr(p, name, value, flag);
+        return null;
+      }      
+    }.resolve(this, absF);
+  }
+  
+  @Override
+  public byte[] getXAttr(Path path, final String name) throws IOException {
+    final Path absF = fixRelativePart(path);
+    return new FileSystemLinkResolver<byte[]>() {
+      @Override
+      public byte[] doCall(final Path p) throws IOException {
+        return dfs.getXAttr(getPathName(p), name);
+      }
+      @Override
+      public byte[] next(final FileSystem fs, final Path p)
+        throws IOException, UnresolvedLinkException {
+        return fs.getXAttr(p, name);
+      }
+    }.resolve(this, absF);
+  }
+  
+  @Override
+  public Map<String, byte[]> getXAttrs(Path path) throws IOException {
+    final Path absF = fixRelativePart(path);
+    return new FileSystemLinkResolver<Map<String, byte[]>>() {
+      @Override
+      public Map<String, byte[]> doCall(final Path p) throws IOException {
+        return dfs.getXAttrs(getPathName(p));
+      }
+      @Override
+      public Map<String, byte[]> next(final FileSystem fs, final Path p)
+        throws IOException, UnresolvedLinkException {
+        return fs.getXAttrs(p);
+      }
+    }.resolve(this, absF);
+  }
+  
+  @Override
+  public Map<String, byte[]> getXAttrs(Path path, final List<String> names) 
+      throws IOException {
+    final Path absF = fixRelativePart(path);
+    return new FileSystemLinkResolver<Map<String, byte[]>>() {
+      @Override
+      public Map<String, byte[]> doCall(final Path p) throws IOException {
+        return dfs.getXAttrs(getPathName(p), names);
+      }
+      @Override
+      public Map<String, byte[]> next(final FileSystem fs, final Path p)
+        throws IOException, UnresolvedLinkException {
+        return fs.getXAttrs(p, names);
+      }
+    }.resolve(this, absF);
+  }
+  
+  @Override
+  public List<String> listXAttrs(Path path)
+          throws IOException {
+    final Path absF = fixRelativePart(path);
+    return new FileSystemLinkResolver<List<String>>() {
+      @Override
+      public List<String> doCall(final Path p) throws IOException {
+        return dfs.listXAttrs(getPathName(p));
+      }
+      @Override
+      public List<String> next(final FileSystem fs, final Path p)
+              throws IOException, UnresolvedLinkException {
+        return fs.listXAttrs(p);
+      }
+    }.resolve(this, absF);
+  }
+
+  @Override
+  public void removeXAttr(Path path, final String name) throws IOException {
+    Path absF = fixRelativePart(path);
+    new FileSystemLinkResolver<Void>() {
+      @Override
+      public Void doCall(final Path p) throws IOException {
+        dfs.removeXAttr(getPathName(p), name);
+        return null;
+      }
+
+      @Override
+      public Void next(final FileSystem fs, final Path p) throws IOException {
+        fs.removeXAttr(p, name);
+        return null;
+      }
+    }.resolve(this, absF);
+  }
+
+  @Override
+  public void access(Path path, final FsAction mode) throws IOException {
+    final Path absF = fixRelativePart(path);
+    new FileSystemLinkResolver<Void>() {
+      @Override
+      public Void doCall(final Path p) throws IOException {
+        dfs.checkAccess(getPathName(p), mode);
+        return null;
+      }
+
+      @Override
+      public Void next(final FileSystem fs, final Path p)
+          throws IOException {
+        fs.access(p, mode);
+        return null;
+      }
+    }.resolve(this, absF);
+  }
 }


Reply via email to