Modified: hadoop/common/branches/HADOOP-10388/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSClient.java URL: http://svn.apache.org/viewvc/hadoop/common/branches/HADOOP-10388/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSClient.java?rev=1619012&r1=1619011&r2=1619012&view=diff ============================================================================== --- hadoop/common/branches/HADOOP-10388/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSClient.java (original) +++ hadoop/common/branches/HADOOP-10388/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSClient.java Tue Aug 19 23:49:39 2014 @@ -17,6 +17,8 @@ */ package org.apache.hadoop.hdfs; +import static org.apache.hadoop.fs.CommonConfigurationKeys.IPC_CLIENT_FALLBACK_TO_SIMPLE_AUTH_ALLOWED_DEFAULT; +import static org.apache.hadoop.fs.CommonConfigurationKeys.IPC_CLIENT_FALLBACK_TO_SIMPLE_AUTH_ALLOWED_KEY; import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_BLOCK_SIZE_DEFAULT; import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_BLOCK_SIZE_KEY; import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_BYTES_PER_CHECKSUM_DEFAULT; @@ -30,19 +32,21 @@ import static org.apache.hadoop.hdfs.DFS import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_CLIENT_CACHE_DROP_BEHIND_READS; import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_CLIENT_CACHE_DROP_BEHIND_WRITES; import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_CLIENT_CACHE_READAHEAD; -import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_CLIENT_DATANODE_RESTART_TIMEOUT_KEY; +import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_CLIENT_CONTEXT; +import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_CLIENT_CONTEXT_DEFAULT; import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_CLIENT_DATANODE_RESTART_TIMEOUT_DEFAULT; +import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_CLIENT_DATANODE_RESTART_TIMEOUT_KEY; import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_CLIENT_FAILOVER_MAX_ATTEMPTS_DEFAULT; import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_CLIENT_FAILOVER_MAX_ATTEMPTS_KEY; import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_CLIENT_FAILOVER_SLEEPTIME_BASE_DEFAULT; import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_CLIENT_FAILOVER_SLEEPTIME_BASE_KEY; import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_CLIENT_FAILOVER_SLEEPTIME_MAX_DEFAULT; import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_CLIENT_FAILOVER_SLEEPTIME_MAX_KEY; -import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_CLIENT_RETRY_MAX_ATTEMPTS_DEFAULT; -import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_CLIENT_RETRY_MAX_ATTEMPTS_KEY; import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_CLIENT_MAX_BLOCK_ACQUIRE_FAILURES_DEFAULT; import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_CLIENT_MAX_BLOCK_ACQUIRE_FAILURES_KEY; import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_CLIENT_READ_PREFETCH_SIZE_KEY; +import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_CLIENT_RETRY_MAX_ATTEMPTS_DEFAULT; +import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_CLIENT_RETRY_MAX_ATTEMPTS_KEY; import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_CLIENT_RETRY_WINDOW_BASE; import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_CLIENT_SOCKET_CACHE_CAPACITY_DEFAULT; import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_CLIENT_SOCKET_CACHE_CAPACITY_KEY; @@ -58,8 +62,6 @@ import static org.apache.hadoop.hdfs.DFS import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_DATANODE_SOCKET_WRITE_TIMEOUT_KEY; import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_REPLICATION_DEFAULT; import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_REPLICATION_KEY; -import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_CLIENT_CONTEXT; -import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_CLIENT_CONTEXT_DEFAULT; import java.io.BufferedOutputStream; import java.io.DataInputStream; @@ -115,14 +117,17 @@ import org.apache.hadoop.fs.Path; import org.apache.hadoop.fs.RemoteIterator; import org.apache.hadoop.fs.UnresolvedLinkException; import org.apache.hadoop.fs.VolumeId; +import org.apache.hadoop.fs.XAttr; +import org.apache.hadoop.fs.XAttrSetFlag; import org.apache.hadoop.fs.permission.AclEntry; import org.apache.hadoop.fs.permission.AclStatus; import org.apache.hadoop.fs.permission.FsPermission; +import org.apache.hadoop.fs.permission.FsAction; import org.apache.hadoop.hdfs.client.HdfsDataInputStream; import org.apache.hadoop.hdfs.client.HdfsDataOutputStream; -import org.apache.hadoop.hdfs.protocol.AclException; import org.apache.hadoop.hdfs.net.Peer; import org.apache.hadoop.hdfs.net.TcpPeerServer; +import org.apache.hadoop.hdfs.protocol.AclException; import org.apache.hadoop.hdfs.protocol.CacheDirectiveEntry; import org.apache.hadoop.hdfs.protocol.CacheDirectiveInfo; import org.apache.hadoop.hdfs.protocol.CacheDirectiveIterator; @@ -132,6 +137,7 @@ import org.apache.hadoop.hdfs.protocol.C import org.apache.hadoop.hdfs.protocol.ClientProtocol; import org.apache.hadoop.hdfs.protocol.CorruptFileBlocks; import org.apache.hadoop.hdfs.protocol.DSQuotaExceededException; +import org.apache.hadoop.hdfs.protocol.DatanodeID; import org.apache.hadoop.hdfs.protocol.DatanodeInfo; import org.apache.hadoop.hdfs.protocol.DirectoryListing; import org.apache.hadoop.hdfs.protocol.ExtendedBlock; @@ -149,16 +155,19 @@ import org.apache.hadoop.hdfs.protocol.S import org.apache.hadoop.hdfs.protocol.SnapshotDiffReport; import org.apache.hadoop.hdfs.protocol.SnapshottableDirectoryStatus; import org.apache.hadoop.hdfs.protocol.UnresolvedPathException; -import org.apache.hadoop.hdfs.protocol.datatransfer.DataTransferEncryptor; import org.apache.hadoop.hdfs.protocol.datatransfer.IOStreamPair; import org.apache.hadoop.hdfs.protocol.datatransfer.Op; import org.apache.hadoop.hdfs.protocol.datatransfer.ReplaceDatanodeOnFailure; -import org.apache.hadoop.hdfs.protocol.datatransfer.TrustedChannelResolver; import org.apache.hadoop.hdfs.protocol.datatransfer.Sender; +import org.apache.hadoop.hdfs.protocol.datatransfer.TrustedChannelResolver; +import org.apache.hadoop.hdfs.protocol.datatransfer.sasl.DataEncryptionKeyFactory; +import org.apache.hadoop.hdfs.protocol.datatransfer.sasl.DataTransferSaslUtil; +import org.apache.hadoop.hdfs.protocol.datatransfer.sasl.SaslDataTransferClient; import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.BlockOpResponseProto; import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.OpBlockChecksumResponseProto; import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.Status; import org.apache.hadoop.hdfs.protocolPB.PBHelper; +import org.apache.hadoop.hdfs.security.token.block.BlockTokenIdentifier; import org.apache.hadoop.hdfs.security.token.block.DataEncryptionKey; import org.apache.hadoop.hdfs.security.token.block.InvalidBlockTokenException; import org.apache.hadoop.hdfs.security.token.delegation.DelegationTokenIdentifier; @@ -166,6 +175,7 @@ import org.apache.hadoop.hdfs.server.com import org.apache.hadoop.hdfs.server.datanode.CachingStrategy; import org.apache.hadoop.hdfs.server.namenode.NameNode; import org.apache.hadoop.hdfs.server.namenode.SafeModeException; +import org.apache.hadoop.hdfs.server.protocol.DatanodeStorageReport; import org.apache.hadoop.io.DataOutputBuffer; import org.apache.hadoop.io.EnumSetWritable; import org.apache.hadoop.io.IOUtils; @@ -191,6 +201,7 @@ import org.apache.hadoop.util.Time; import com.google.common.annotations.VisibleForTesting; import com.google.common.base.Joiner; import com.google.common.base.Preconditions; +import com.google.common.collect.Lists; import com.google.common.net.InetAddresses; /******************************************************** @@ -205,7 +216,8 @@ import com.google.common.net.InetAddress * ********************************************************/ @InterfaceAudience.Private -public class DFSClient implements java.io.Closeable, RemotePeerFactory { +public class DFSClient implements java.io.Closeable, RemotePeerFactory, + DataEncryptionKeyFactory { public static final Log LOG = LogFactory.getLog(DFSClient.class); public static final long SERVER_DEFAULTS_VALIDITY_PERIOD = 60 * 60 * 1000L; // 1 hour static final int TCP_WINDOW_SIZE = 128 * 1024; // 128 KB @@ -229,7 +241,7 @@ public class DFSClient implements java.i private final Random r = new Random(); private SocketAddress[] localInterfaceAddrs; private DataEncryptionKey encryptionKey; - final TrustedChannelResolver trustedChannelResolver; + final SaslDataTransferClient saslClient; private final CachingStrategy defaultReadCachingStrategy; private final CachingStrategy defaultWriteCachingStrategy; private final ClientContext clientContext; @@ -274,6 +286,7 @@ public class DFSClient implements java.i final int retryTimesForGetLastBlockLength; final int retryIntervalForGetLastBlockLength; final long datanodeRestartTimeout; + final long dfsclientSlowIoWarningThresholdMs; final boolean useLegacyBlockReader; final boolean useLegacyBlockReaderLocal; @@ -428,6 +441,9 @@ public class DFSClient implements java.i datanodeRestartTimeout = conf.getLong( DFS_CLIENT_DATANODE_RESTART_TIMEOUT_KEY, DFS_CLIENT_DATANODE_RESTART_TIMEOUT_DEFAULT) * 1000; + dfsclientSlowIoWarningThresholdMs = conf.getLong( + DFSConfigKeys.DFS_CLIENT_SLOW_IO_WARNING_THRESHOLD_KEY, + DFSConfigKeys.DFS_CLIENT_SLOW_IO_WARNING_THRESHOLD_DEFAULT); } public boolean isUseLegacyBlockReaderLocal() { @@ -502,8 +518,8 @@ public class DFSClient implements java.i * that are currently being written by this client. * Note that a file can only be written by a single client. */ - private final Map<String, DFSOutputStream> filesBeingWritten - = new HashMap<String, DFSOutputStream>(); + private final Map<Long, DFSOutputStream> filesBeingWritten + = new HashMap<Long, DFSOutputStream>(); /** * Same as this(NameNode.getAddress(conf), conf); @@ -627,7 +643,12 @@ public class DFSClient implements java.i if (numThreads > 0) { this.initThreadsNumForHedgedReads(numThreads); } - this.trustedChannelResolver = TrustedChannelResolver.getInstance(getConfiguration()); + this.saslClient = new SaslDataTransferClient( + DataTransferSaslUtil.getSaslPropertiesResolver(conf), + TrustedChannelResolver.getInstance(conf), + conf.getBoolean( + IPC_CLIENT_FALLBACK_TO_SIMPLE_AUTH_ALLOWED_KEY, + IPC_CLIENT_FALLBACK_TO_SIMPLE_AUTH_ALLOWED_DEFAULT)); } /** @@ -732,14 +753,14 @@ public class DFSClient implements java.i } /** Get a lease and start automatic renewal */ - private void beginFileLease(final String src, final DFSOutputStream out) + private void beginFileLease(final long inodeId, final DFSOutputStream out) throws IOException { - getLeaseRenewer().put(src, out, this); + getLeaseRenewer().put(inodeId, out, this); } /** Stop renewal of lease for the file. */ - void endFileLease(final String src) throws IOException { - getLeaseRenewer().closeFile(src, this); + void endFileLease(final long inodeId) throws IOException { + getLeaseRenewer().closeFile(inodeId, this); } @@ -747,9 +768,9 @@ public class DFSClient implements java.i * enforced to consistently update its local dfsclients array and * client's filesBeingWritten map. */ - void putFileBeingWritten(final String src, final DFSOutputStream out) { + void putFileBeingWritten(final long inodeId, final DFSOutputStream out) { synchronized(filesBeingWritten) { - filesBeingWritten.put(src, out); + filesBeingWritten.put(inodeId, out); // update the last lease renewal time only when there was no // writes. once there is one write stream open, the lease renewer // thread keeps it updated well with in anyone's expiration time. @@ -760,9 +781,9 @@ public class DFSClient implements java.i } /** Remove a file. Only called from LeaseRenewer. */ - void removeFileBeingWritten(final String src) { + void removeFileBeingWritten(final long inodeId) { synchronized(filesBeingWritten) { - filesBeingWritten.remove(src); + filesBeingWritten.remove(inodeId); if (filesBeingWritten.isEmpty()) { lastLeaseRenewal = 0; } @@ -847,14 +868,14 @@ public class DFSClient implements java.i /** Close/abort all files being written. */ private void closeAllFilesBeingWritten(final boolean abort) { for(;;) { - final String src; + final long inodeId; final DFSOutputStream out; synchronized(filesBeingWritten) { if (filesBeingWritten.isEmpty()) { return; } - src = filesBeingWritten.keySet().iterator().next(); - out = filesBeingWritten.remove(src); + inodeId = filesBeingWritten.keySet().iterator().next(); + out = filesBeingWritten.remove(inodeId); } if (out != null) { try { @@ -864,8 +885,8 @@ public class DFSClient implements java.i out.close(); } } catch(IOException ie) { - LOG.error("Failed to " + (abort? "abort": "close") + " file " + src, - ie); + LOG.error("Failed to " + (abort? "abort": "close") + + " inode " + inodeId, ie); } } } @@ -1463,7 +1484,7 @@ public class DFSClient implements java.i final DFSOutputStream result = DFSOutputStream.newStreamForCreate(this, src, masked, flag, createParent, replication, blockSize, progress, buffersize, dfsClientConf.createChecksum(checksumOpt), favoredNodeStrs); - beginFileLease(src, result); + beginFileLease(result.getFileId(), result); return result; } @@ -1511,7 +1532,7 @@ public class DFSClient implements java.i flag, createParent, replication, blockSize, progress, buffersize, checksum); } - beginFileLease(src, result); + beginFileLease(result.getFileId(), result); return result; } @@ -1599,14 +1620,14 @@ public class DFSClient implements java.i + src + " on client " + clientName); } final DFSOutputStream result = callAppend(stat, src, buffersize, progress); - beginFileLease(src, result); + beginFileLease(result.getFileId(), result); return result; } /** * Set replication for an existing file. * @param src file name - * @param replication + * @param replication replication to set the file to * * @see ClientProtocol#setReplication(String, short) */ @@ -1797,19 +1818,6 @@ public class DFSClient implements java.i UnresolvedPathException.class); } } - - /** - * Get the checksum of a file. - * @param src The file path - * @return The checksum - * @see DistributedFileSystem#getFileChecksum(Path) - */ - public MD5MD5CRC32FileChecksum getFileChecksum(String src) throws IOException { - checkOpen(); - return getFileChecksum(src, clientName, namenode, socketFactory, - dfsClientConf.socketTimeout, getDataEncryptionKey(), - dfsClientConf.connectToDnViaHostname); - } @InterfaceAudience.Private public void clearDataEncryptionKey() { @@ -1829,11 +1837,9 @@ public class DFSClient implements java.i return d == null ? false : d.getEncryptDataTransfer(); } - @InterfaceAudience.Private - public DataEncryptionKey getDataEncryptionKey() - throws IOException { - if (shouldEncryptData() && - !this.trustedChannelResolver.isTrusted()) { + @Override + public DataEncryptionKey newDataEncryptionKey() throws IOException { + if (shouldEncryptData()) { synchronized (this) { if (encryptionKey == null || encryptionKey.expiryDate < Time.now()) { @@ -1848,23 +1854,20 @@ public class DFSClient implements java.i } /** - * Get the checksum of a file. + * Get the checksum of the whole file of a range of the file. Note that the + * range always starts from the beginning of the file. * @param src The file path - * @param clientName the name of the client requesting the checksum. - * @param namenode the RPC proxy for the namenode - * @param socketFactory to create sockets to connect to DNs - * @param socketTimeout timeout to use when connecting and waiting for a response - * @param encryptionKey the key needed to communicate with DNs in this cluster - * @param connectToDnViaHostname whether the client should use hostnames instead of IPs + * @param length the length of the range, i.e., the range is [0, length] * @return The checksum + * @see DistributedFileSystem#getFileChecksum(Path) */ - private static MD5MD5CRC32FileChecksum getFileChecksum(String src, - String clientName, - ClientProtocol namenode, SocketFactory socketFactory, int socketTimeout, - DataEncryptionKey encryptionKey, boolean connectToDnViaHostname) + public MD5MD5CRC32FileChecksum getFileChecksum(String src, long length) throws IOException { - //get all block locations - LocatedBlocks blockLocations = callGetBlockLocations(namenode, src, 0, Long.MAX_VALUE); + checkOpen(); + Preconditions.checkArgument(length >= 0); + //get block locations for the file range + LocatedBlocks blockLocations = callGetBlockLocations(namenode, src, 0, + length); if (null == blockLocations) { throw new FileNotFoundException("File does not exist: " + src); } @@ -1876,10 +1879,11 @@ public class DFSClient implements java.i boolean refetchBlocks = false; int lastRetriedIndex = -1; - //get block checksum for each block - for(int i = 0; i < locatedblocks.size(); i++) { + // get block checksum for each block + long remaining = length; + for(int i = 0; i < locatedblocks.size() && remaining > 0; i++) { if (refetchBlocks) { // refetch to get fresh tokens - blockLocations = callGetBlockLocations(namenode, src, 0, Long.MAX_VALUE); + blockLocations = callGetBlockLocations(namenode, src, 0, length); if (null == blockLocations) { throw new FileNotFoundException("File does not exist: " + src); } @@ -1888,10 +1892,14 @@ public class DFSClient implements java.i } LocatedBlock lb = locatedblocks.get(i); final ExtendedBlock block = lb.getBlock(); + if (remaining < block.getNumBytes()) { + block.setNumBytes(remaining); + } + remaining -= block.getNumBytes(); final DatanodeInfo[] datanodes = lb.getLocations(); //try each datanode location of the block - final int timeout = 3000 * datanodes.length + socketTimeout; + final int timeout = 3000 * datanodes.length + dfsClientConf.socketTimeout; boolean done = false; for(int j = 0; !done && j < datanodes.length; j++) { DataOutputStream out = null; @@ -1899,8 +1907,7 @@ public class DFSClient implements java.i try { //connect to a datanode - IOStreamPair pair = connectToDN(socketFactory, connectToDnViaHostname, - encryptionKey, datanodes[j], timeout); + IOStreamPair pair = connectToDN(datanodes[j], timeout, lb); out = new DataOutputStream(new BufferedOutputStream(pair.out, HdfsConstants.SMALL_BUFFER_SIZE)); in = new DataInputStream(pair.in); @@ -1956,9 +1963,7 @@ public class DFSClient implements java.i } else { LOG.debug("Retrieving checksum from an earlier-version DataNode: " + "inferring checksum by reading first byte"); - ct = inferChecksumTypeByReading( - clientName, socketFactory, socketTimeout, lb, datanodes[j], - encryptionKey, connectToDnViaHostname); + ct = inferChecksumTypeByReading(lb, datanodes[j]); } if (i == 0) { // first block @@ -2032,16 +2037,13 @@ public class DFSClient implements java.i * Connect to the given datanode's datantrasfer port, and return * the resulting IOStreamPair. This includes encryption wrapping, etc. */ - private static IOStreamPair connectToDN( - SocketFactory socketFactory, boolean connectToDnViaHostname, - DataEncryptionKey encryptionKey, DatanodeInfo dn, int timeout) - throws IOException - { + private IOStreamPair connectToDN(DatanodeInfo dn, int timeout, + LocatedBlock lb) throws IOException { boolean success = false; Socket sock = null; try { sock = socketFactory.createSocket(); - String dnAddr = dn.getXferAddr(connectToDnViaHostname); + String dnAddr = dn.getXferAddr(getConf().connectToDnViaHostname); if (LOG.isDebugEnabled()) { LOG.debug("Connecting to datanode " + dnAddr); } @@ -2050,13 +2052,8 @@ public class DFSClient implements java.i OutputStream unbufOut = NetUtils.getOutputStream(sock); InputStream unbufIn = NetUtils.getInputStream(sock); - IOStreamPair ret; - if (encryptionKey != null) { - ret = DataTransferEncryptor.getEncryptedStreams( - unbufOut, unbufIn, encryptionKey); - } else { - ret = new IOStreamPair(unbufIn, unbufOut); - } + IOStreamPair ret = saslClient.newSocketSend(sock, unbufOut, unbufIn, this, + lb.getBlockToken(), dn); success = true; return ret; } finally { @@ -2072,21 +2069,14 @@ public class DFSClient implements java.i * with older HDFS versions which did not include the checksum type in * OpBlockChecksumResponseProto. * - * @param in input stream from datanode - * @param out output stream to datanode * @param lb the located block - * @param clientName the name of the DFSClient requesting the checksum * @param dn the connected datanode * @return the inferred checksum type * @throws IOException if an error occurs */ - private static Type inferChecksumTypeByReading( - String clientName, SocketFactory socketFactory, int socketTimeout, - LocatedBlock lb, DatanodeInfo dn, - DataEncryptionKey encryptionKey, boolean connectToDnViaHostname) + private Type inferChecksumTypeByReading(LocatedBlock lb, DatanodeInfo dn) throws IOException { - IOStreamPair pair = connectToDN(socketFactory, connectToDnViaHostname, - encryptionKey, dn, socketTimeout); + IOStreamPair pair = connectToDN(dn, dfsClientConf.socketTimeout, lb); try { DataOutputStream out = new DataOutputStream(new BufferedOutputStream(pair.out, @@ -2116,7 +2106,7 @@ public class DFSClient implements java.i /** * Set permissions to a file or directory. * @param src path name. - * @param permission + * @param permission permission to set to * * @see ClientProtocol#setPermission(String, FsPermission) */ @@ -2204,6 +2194,11 @@ public class DFSClient implements java.i return namenode.getDatanodeReport(type); } + public DatanodeStorageReport[] getDatanodeStorageReport( + DatanodeReportType type) throws IOException { + return namenode.getDatanodeStorageReport(type); + } + /** * Enter, leave or get safe mode. * @@ -2436,8 +2431,8 @@ public class DFSClient implements java.i } @VisibleForTesting - ExtendedBlock getPreviousBlock(String file) { - return filesBeingWritten.get(file).getBlock(); + ExtendedBlock getPreviousBlock(long fileId) { + return filesBeingWritten.get(fileId).getBlock(); } /** @@ -2757,9 +2752,102 @@ public class DFSClient implements java.i UnresolvedPathException.class); } } + + public void setXAttr(String src, String name, byte[] value, + EnumSet<XAttrSetFlag> flag) throws IOException { + checkOpen(); + try { + namenode.setXAttr(src, XAttrHelper.buildXAttr(name, value), flag); + } catch (RemoteException re) { + throw re.unwrapRemoteException(AccessControlException.class, + FileNotFoundException.class, + NSQuotaExceededException.class, + SafeModeException.class, + SnapshotAccessControlException.class, + UnresolvedPathException.class); + } + } + + public byte[] getXAttr(String src, String name) throws IOException { + checkOpen(); + try { + final List<XAttr> xAttrs = XAttrHelper.buildXAttrAsList(name); + final List<XAttr> result = namenode.getXAttrs(src, xAttrs); + return XAttrHelper.getFirstXAttrValue(result); + } catch(RemoteException re) { + throw re.unwrapRemoteException(AccessControlException.class, + FileNotFoundException.class, + UnresolvedPathException.class); + } + } + + public Map<String, byte[]> getXAttrs(String src) throws IOException { + checkOpen(); + try { + return XAttrHelper.buildXAttrMap(namenode.getXAttrs(src, null)); + } catch(RemoteException re) { + throw re.unwrapRemoteException(AccessControlException.class, + FileNotFoundException.class, + UnresolvedPathException.class); + } + } + + public Map<String, byte[]> getXAttrs(String src, List<String> names) + throws IOException { + checkOpen(); + try { + return XAttrHelper.buildXAttrMap(namenode.getXAttrs( + src, XAttrHelper.buildXAttrs(names))); + } catch(RemoteException re) { + throw re.unwrapRemoteException(AccessControlException.class, + FileNotFoundException.class, + UnresolvedPathException.class); + } + } + + public List<String> listXAttrs(String src) + throws IOException { + checkOpen(); + try { + final Map<String, byte[]> xattrs = + XAttrHelper.buildXAttrMap(namenode.listXAttrs(src)); + return Lists.newArrayList(xattrs.keySet()); + } catch(RemoteException re) { + throw re.unwrapRemoteException(AccessControlException.class, + FileNotFoundException.class, + UnresolvedPathException.class); + } + } + + public void removeXAttr(String src, String name) throws IOException { + checkOpen(); + try { + namenode.removeXAttr(src, XAttrHelper.buildXAttr(name)); + } catch(RemoteException re) { + throw re.unwrapRemoteException(AccessControlException.class, + FileNotFoundException.class, + NSQuotaExceededException.class, + SafeModeException.class, + SnapshotAccessControlException.class, + UnresolvedPathException.class); + } + } + + public void checkAccess(String src, FsAction mode) throws IOException { + checkOpen(); + try { + namenode.checkAccess(src, mode); + } catch (RemoteException re) { + throw re.unwrapRemoteException(AccessControlException.class, + FileNotFoundException.class, + UnresolvedPathException.class); + } + } @Override // RemotePeerFactory - public Peer newConnectedPeer(InetSocketAddress addr) throws IOException { + public Peer newConnectedPeer(InetSocketAddress addr, + Token<BlockTokenIdentifier> blockToken, DatanodeID datanodeId) + throws IOException { Peer peer = null; boolean success = false; Socket sock = null; @@ -2768,8 +2856,8 @@ public class DFSClient implements java.i NetUtils.connect(sock, addr, getRandomLocalInterfaceAddr(), dfsClientConf.socketTimeout); - peer = TcpPeerServer.peerFromSocketAndKey(sock, - getDataEncryptionKey()); + peer = TcpPeerServer.peerFromSocketAndKey(saslClient, sock, this, + blockToken, datanodeId); success = true; return peer; } finally {
Modified: hadoop/common/branches/HADOOP-10388/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSClientFaultInjector.java URL: http://svn.apache.org/viewvc/hadoop/common/branches/HADOOP-10388/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSClientFaultInjector.java?rev=1619012&r1=1619011&r2=1619012&view=diff ============================================================================== --- hadoop/common/branches/HADOOP-10388/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSClientFaultInjector.java (original) +++ hadoop/common/branches/HADOOP-10388/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSClientFaultInjector.java Tue Aug 19 23:49:39 2014 @@ -17,6 +17,8 @@ */ package org.apache.hadoop.hdfs; +import java.util.concurrent.atomic.AtomicLong; + import com.google.common.annotations.VisibleForTesting; import org.apache.hadoop.classification.InterfaceAudience; @@ -29,6 +31,7 @@ import org.apache.hadoop.classification. @InterfaceAudience.Private public class DFSClientFaultInjector { public static DFSClientFaultInjector instance = new DFSClientFaultInjector(); + public static AtomicLong exceptionNum = new AtomicLong(0); public static DFSClientFaultInjector get() { return instance; @@ -47,4 +50,8 @@ public class DFSClientFaultInjector { } public void startFetchFromDatanode() {} + + public void fetchFromDatanodeException() {} + + public void readFromDatanodeDelay() {} } Modified: hadoop/common/branches/HADOOP-10388/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSConfigKeys.java URL: http://svn.apache.org/viewvc/hadoop/common/branches/HADOOP-10388/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSConfigKeys.java?rev=1619012&r1=1619011&r2=1619012&view=diff ============================================================================== --- hadoop/common/branches/HADOOP-10388/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSConfigKeys.java (original) +++ hadoop/common/branches/HADOOP-10388/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSConfigKeys.java Tue Aug 19 23:49:39 2014 @@ -105,12 +105,16 @@ public class DFSConfigKeys extends Commo public static final String DFS_NAMENODE_BACKUP_SERVICE_RPC_ADDRESS_KEY = "dfs.namenode.backup.dnrpc-address"; public static final String DFS_DATANODE_BALANCE_BANDWIDTHPERSEC_KEY = "dfs.datanode.balance.bandwidthPerSec"; public static final long DFS_DATANODE_BALANCE_BANDWIDTHPERSEC_DEFAULT = 1024*1024; + public static final String DFS_DATANODE_BALANCE_MAX_NUM_CONCURRENT_MOVES_KEY = "dfs.datanode.balance.max.concurrent.moves"; + public static final int DFS_DATANODE_BALANCE_MAX_NUM_CONCURRENT_MOVES_DEFAULT = 5; public static final String DFS_DATANODE_READAHEAD_BYTES_KEY = "dfs.datanode.readahead.bytes"; public static final long DFS_DATANODE_READAHEAD_BYTES_DEFAULT = 4 * 1024 * 1024; // 4MB public static final String DFS_DATANODE_DROP_CACHE_BEHIND_WRITES_KEY = "dfs.datanode.drop.cache.behind.writes"; public static final boolean DFS_DATANODE_DROP_CACHE_BEHIND_WRITES_DEFAULT = false; public static final String DFS_DATANODE_SYNC_BEHIND_WRITES_KEY = "dfs.datanode.sync.behind.writes"; public static final boolean DFS_DATANODE_SYNC_BEHIND_WRITES_DEFAULT = false; + public static final String DFS_DATANODE_SYNC_BEHIND_WRITES_IN_BACKGROUND_KEY = "dfs.datanode.sync.behind.writes.in.background"; + public static final boolean DFS_DATANODE_SYNC_BEHIND_WRITES_IN_BACKGROUND_DEFAULT = false; public static final String DFS_DATANODE_DROP_CACHE_BEHIND_READS_KEY = "dfs.datanode.drop.cache.behind.reads"; public static final boolean DFS_DATANODE_DROP_CACHE_BEHIND_READS_DEFAULT = false; public static final String DFS_DATANODE_USE_DN_HOSTNAME = "dfs.datanode.use.datanode.hostname"; @@ -126,6 +130,7 @@ public class DFSConfigKeys extends Commo public static final int DFS_NAMENODE_HTTP_PORT_DEFAULT = 50070; public static final String DFS_NAMENODE_HTTP_ADDRESS_KEY = "dfs.namenode.http-address"; public static final String DFS_NAMENODE_HTTP_ADDRESS_DEFAULT = "0.0.0.0:" + DFS_NAMENODE_HTTP_PORT_DEFAULT; + public static final String DFS_NAMENODE_HTTP_BIND_HOST_KEY = "dfs.namenode.http-bind-host"; public static final String DFS_NAMENODE_RPC_ADDRESS_KEY = "dfs.namenode.rpc-address"; public static final String DFS_NAMENODE_RPC_BIND_HOST_KEY = "dfs.namenode.rpc-bind-host"; public static final String DFS_NAMENODE_SERVICE_RPC_ADDRESS_KEY = "dfs.namenode.servicerpc-address"; @@ -191,9 +196,14 @@ public class DFSConfigKeys extends Commo public static final String DFS_PERMISSIONS_SUPERUSERGROUP_DEFAULT = "supergroup"; public static final String DFS_NAMENODE_ACLS_ENABLED_KEY = "dfs.namenode.acls.enabled"; public static final boolean DFS_NAMENODE_ACLS_ENABLED_DEFAULT = false; + public static final String DFS_NAMENODE_XATTRS_ENABLED_KEY = "dfs.namenode.xattrs.enabled"; + public static final boolean DFS_NAMENODE_XATTRS_ENABLED_DEFAULT = true; public static final String DFS_ADMIN = "dfs.cluster.administrators"; public static final String DFS_SERVER_HTTPS_KEYSTORE_RESOURCE_KEY = "dfs.https.server.keystore.resource"; public static final String DFS_SERVER_HTTPS_KEYSTORE_RESOURCE_DEFAULT = "ssl-server.xml"; + public static final String DFS_SERVER_HTTPS_KEYPASSWORD_KEY = "ssl.server.keystore.keypassword"; + public static final String DFS_SERVER_HTTPS_KEYSTORE_PASSWORD_KEY = "ssl.server.keystore.password"; + public static final String DFS_SERVER_HTTPS_TRUSTSTORE_PASSWORD_KEY = "ssl.server.truststore.password"; public static final String DFS_NAMENODE_NAME_DIR_RESTORE_KEY = "dfs.namenode.name.dir.restore"; public static final boolean DFS_NAMENODE_NAME_DIR_RESTORE_DEFAULT = false; public static final String DFS_NAMENODE_SUPPORT_ALLOW_FORMAT_KEY = "dfs.namenode.support.allow.format"; @@ -207,6 +217,9 @@ public class DFSConfigKeys extends Commo public static final String DFS_NAMENODE_MIN_SUPPORTED_DATANODE_VERSION_KEY = "dfs.namenode.min.supported.datanode.version"; public static final String DFS_NAMENODE_MIN_SUPPORTED_DATANODE_VERSION_DEFAULT = "3.0.0-SNAPSHOT"; + public static final String DFS_NAMENODE_RANDOMIZE_BLOCK_LOCATIONS_PER_BLOCK = "dfs.namenode.randomize-block-locations-per-block"; + public static final boolean DFS_NAMENODE_RANDOMIZE_BLOCK_LOCATIONS_PER_BLOCK_DEFAULT = false; + public static final String DFS_NAMENODE_EDITS_DIR_MINIMUM_KEY = "dfs.namenode.edits.dir.minimum"; public static final int DFS_NAMENODE_EDITS_DIR_MINIMUM_DEFAULT = 1; @@ -244,6 +257,10 @@ public class DFSConfigKeys extends Commo "dfs.namenode.path.based.cache.refresh.interval.ms"; public static final long DFS_NAMENODE_PATH_BASED_CACHE_REFRESH_INTERVAL_MS_DEFAULT = 30000L; + /** Pending period of block deletion since NameNode startup */ + public static final String DFS_NAMENODE_STARTUP_DELAY_BLOCK_DELETION_SEC_KEY = "dfs.namenode.startup.delay.block.deletion.sec"; + public static final long DFS_NAMENODE_STARTUP_DELAY_BLOCK_DELETION_SEC_DEFAULT = 0L; + // Whether to enable datanode's stale state detection and usage for reads public static final String DFS_NAMENODE_AVOID_STALE_DATANODE_FOR_READ_KEY = "dfs.namenode.avoid.read.stale.datanode"; public static final boolean DFS_NAMENODE_AVOID_STALE_DATANODE_FOR_READ_DEFAULT = false; @@ -290,11 +307,17 @@ public class DFSConfigKeys extends Commo public static final long DFS_NAMENODE_MIN_BLOCK_SIZE_DEFAULT = 1024*1024; public static final String DFS_NAMENODE_MAX_BLOCKS_PER_FILE_KEY = "dfs.namenode.fs-limits.max-blocks-per-file"; public static final long DFS_NAMENODE_MAX_BLOCKS_PER_FILE_DEFAULT = 1024*1024; + public static final String DFS_NAMENODE_MAX_XATTRS_PER_INODE_KEY = "dfs.namenode.fs-limits.max-xattrs-per-inode"; + public static final int DFS_NAMENODE_MAX_XATTRS_PER_INODE_DEFAULT = 32; + public static final String DFS_NAMENODE_MAX_XATTR_SIZE_KEY = "dfs.namenode.fs-limits.max-xattr-size"; + public static final int DFS_NAMENODE_MAX_XATTR_SIZE_DEFAULT = 16384; + //Following keys have no defaults public static final String DFS_DATANODE_DATA_DIR_KEY = "dfs.datanode.data.dir"; public static final int DFS_NAMENODE_HTTPS_PORT_DEFAULT = 50470; public static final String DFS_NAMENODE_HTTPS_ADDRESS_KEY = "dfs.namenode.https-address"; + public static final String DFS_NAMENODE_HTTPS_BIND_HOST_KEY = "dfs.namenode.https-bind-host"; public static final String DFS_NAMENODE_HTTPS_ADDRESS_DEFAULT = "0.0.0.0:" + DFS_NAMENODE_HTTPS_PORT_DEFAULT; public static final String DFS_NAMENODE_NAME_DIR_KEY = "dfs.namenode.name.dir"; public static final String DFS_NAMENODE_EDITS_DIR_KEY = "dfs.namenode.edits.dir"; @@ -361,8 +384,6 @@ public class DFSConfigKeys extends Commo public static final String DFS_DATANODE_HTTP_ADDRESS_DEFAULT = "0.0.0.0:" + DFS_DATANODE_HTTP_DEFAULT_PORT; public static final String DFS_DATANODE_MAX_RECEIVER_THREADS_KEY = "dfs.datanode.max.transfer.threads"; public static final int DFS_DATANODE_MAX_RECEIVER_THREADS_DEFAULT = 4096; - public static final String DFS_DATANODE_NUMBLOCKS_KEY = "dfs.datanode.numblocks"; - public static final int DFS_DATANODE_NUMBLOCKS_DEFAULT = 64; public static final String DFS_DATANODE_SCAN_PERIOD_HOURS_KEY = "dfs.datanode.scan.period.hours"; public static final int DFS_DATANODE_SCAN_PERIOD_HOURS_DEFAULT = 0; public static final String DFS_DATANODE_TRANSFERTO_ALLOWED_KEY = "dfs.datanode.transferTo.allowed"; @@ -379,8 +400,6 @@ public class DFSConfigKeys extends Commo public static final int DFS_NAMENODE_HANDLER_COUNT_DEFAULT = 10; public static final String DFS_NAMENODE_SERVICE_HANDLER_COUNT_KEY = "dfs.namenode.service.handler.count"; public static final int DFS_NAMENODE_SERVICE_HANDLER_COUNT_DEFAULT = 10; - public static final String DFS_SUPPORT_APPEND_KEY = "dfs.support.append"; - public static final boolean DFS_SUPPORT_APPEND_DEFAULT = true; public static final String DFS_HTTP_POLICY_KEY = "dfs.http.policy"; public static final String DFS_HTTP_POLICY_DEFAULT = HttpConfig.Policy.HTTP_ONLY.name(); public static final String DFS_DEFAULT_CHUNK_VIEW_SIZE_KEY = "dfs.default.chunk.view.size"; @@ -481,18 +500,29 @@ public class DFSConfigKeys extends Commo public static final String DFS_NAMENODE_STARTUP_KEY = "dfs.namenode.startup"; public static final String DFS_DATANODE_KEYTAB_FILE_KEY = "dfs.datanode.keytab.file"; public static final String DFS_DATANODE_KERBEROS_PRINCIPAL_KEY = "dfs.datanode.kerberos.principal"; + @Deprecated + public static final String DFS_DATANODE_USER_NAME_KEY = DFS_DATANODE_KERBEROS_PRINCIPAL_KEY; public static final String DFS_DATANODE_SHARED_FILE_DESCRIPTOR_PATHS = "dfs.datanode.shared.file.descriptor.paths"; public static final String DFS_DATANODE_SHARED_FILE_DESCRIPTOR_PATHS_DEFAULT = "/dev/shm,/tmp"; public static final String DFS_SHORT_CIRCUIT_SHARED_MEMORY_WATCHER_INTERRUPT_CHECK_MS = "dfs.short.circuit.shared.memory.watcher.interrupt.check.ms"; public static final int DFS_SHORT_CIRCUIT_SHARED_MEMORY_WATCHER_INTERRUPT_CHECK_MS_DEFAULT = 60000; public static final String DFS_NAMENODE_KEYTAB_FILE_KEY = "dfs.namenode.keytab.file"; public static final String DFS_NAMENODE_KERBEROS_PRINCIPAL_KEY = "dfs.namenode.kerberos.principal"; + @Deprecated + public static final String DFS_NAMENODE_USER_NAME_KEY = DFS_NAMENODE_KERBEROS_PRINCIPAL_KEY; public static final String DFS_NAMENODE_KERBEROS_INTERNAL_SPNEGO_PRINCIPAL_KEY = "dfs.namenode.kerberos.internal.spnego.principal"; + @Deprecated + public static final String DFS_NAMENODE_INTERNAL_SPNEGO_USER_NAME_KEY = DFS_NAMENODE_KERBEROS_INTERNAL_SPNEGO_PRINCIPAL_KEY; public static final String DFS_SECONDARY_NAMENODE_KEYTAB_FILE_KEY = "dfs.secondary.namenode.keytab.file"; public static final String DFS_SECONDARY_NAMENODE_KERBEROS_PRINCIPAL_KEY = "dfs.secondary.namenode.kerberos.principal"; + @Deprecated + public static final String DFS_SECONDARY_NAMENODE_USER_NAME_KEY = DFS_SECONDARY_NAMENODE_KERBEROS_PRINCIPAL_KEY; public static final String DFS_SECONDARY_NAMENODE_KERBEROS_INTERNAL_SPNEGO_PRINCIPAL_KEY = "dfs.secondary.namenode.kerberos.internal.spnego.principal"; + @Deprecated + public static final String DFS_SECONDARY_NAMENODE_INTERNAL_SPNEGO_USER_NAME_KEY = DFS_SECONDARY_NAMENODE_KERBEROS_INTERNAL_SPNEGO_PRINCIPAL_KEY; public static final String DFS_NAMENODE_NAME_CACHE_THRESHOLD_KEY = "dfs.namenode.name.cache.threshold"; public static final int DFS_NAMENODE_NAME_CACHE_THRESHOLD_DEFAULT = 10; + public static final String DFS_NAMENODE_LEGACY_OIV_IMAGE_DIR_KEY = "dfs.namenode.legacy-oiv-image.dir"; public static final String DFS_NAMESERVICES = "dfs.nameservices"; public static final String DFS_NAMESERVICE_ID = "dfs.nameservice.id"; @@ -535,6 +565,8 @@ public class DFSConfigKeys extends Commo public static final boolean DFS_ENCRYPT_DATA_TRANSFER_DEFAULT = false; public static final String DFS_DATA_ENCRYPTION_ALGORITHM_KEY = "dfs.encrypt.data.transfer.algorithm"; public static final String DFS_TRUSTEDCHANNEL_RESOLVER_CLASS = "dfs.trustedchannel.resolver.class"; + public static final String DFS_DATA_TRANSFER_PROTECTION_KEY = "dfs.data.transfer.protection"; + public static final String DFS_DATA_TRANSFER_SASL_PROPS_RESOLVER_CLASS_KEY = "dfs.data.transfer.saslproperties.resolver.class"; // Journal-node related configs. These are read on the JN side. public static final String DFS_JOURNALNODE_EDITS_DIR_KEY = "dfs.journalnode.edits.dir"; @@ -626,7 +658,17 @@ public class DFSConfigKeys extends Commo public static final String DFS_DFSCLIENT_HEDGED_READ_THREADPOOL_SIZE = "dfs.client.hedged.read.threadpool.size"; - public static final int DEFAULT_DFSCLIENT_HEDGED_READ_THREADPOOL_SIZE = 0; - public static final String DFS_NFS_KEYTAB_FILE_KEY = "dfs.nfs.keytab.file"; - public static final String DFS_NFS_KERBEROS_PRINCIPAL_KEY = "dfs.nfs.kerberos.principal"; + public static final int DEFAULT_DFSCLIENT_HEDGED_READ_THREADPOOL_SIZE = 0; + + // Slow io warning log threshold settings for dfsclient and datanode. + public static final String DFS_CLIENT_SLOW_IO_WARNING_THRESHOLD_KEY = + "dfs.client.slow.io.warning.threshold.ms"; + public static final long DFS_CLIENT_SLOW_IO_WARNING_THRESHOLD_DEFAULT = 30000; + public static final String DFS_DATANODE_SLOW_IO_WARNING_THRESHOLD_KEY = + "dfs.datanode.slow.io.warning.threshold.ms"; + public static final long DFS_DATANODE_SLOW_IO_WARNING_THRESHOLD_DEFAULT = 300; + + public static final String DFS_DATANODE_BLOCK_ID_LAYOUT_UPGRADE_THREADS_KEY = + "dfs.datanode.block.id.layout.upgrade.threads"; + public static final int DFS_DATANODE_BLOCK_ID_LAYOUT_UPGRADE_THREADS = 12; } Modified: hadoop/common/branches/HADOOP-10388/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSInputStream.java URL: http://svn.apache.org/viewvc/hadoop/common/branches/HADOOP-10388/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSInputStream.java?rev=1619012&r1=1619011&r2=1619012&view=diff ============================================================================== --- hadoop/common/branches/HADOOP-10388/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSInputStream.java (original) +++ hadoop/common/branches/HADOOP-10388/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSInputStream.java Tue Aug 19 23:49:39 2014 @@ -17,6 +17,7 @@ */ package org.apache.hadoop.hdfs; +import java.io.EOFException; import java.io.IOException; import java.net.InetSocketAddress; import java.nio.ByteBuffer; @@ -32,12 +33,14 @@ import java.util.Map; import java.util.Map.Entry; import java.util.Set; import java.util.concurrent.Callable; +import java.util.concurrent.CancellationException; +import java.util.concurrent.CompletionService; import java.util.concurrent.ConcurrentHashMap; -import java.util.concurrent.CountDownLatch; import java.util.concurrent.ExecutionException; +import java.util.concurrent.ExecutorCompletionService; import java.util.concurrent.Future; import java.util.concurrent.TimeUnit; -import java.util.concurrent.TimeoutException; +import java.util.concurrent.atomic.AtomicLong; import org.apache.commons.io.IOUtils; import org.apache.hadoop.classification.InterfaceAudience; @@ -81,6 +84,7 @@ implements ByteBufferReadable, CanSetDro HasEnhancedByteBufferAccess { @VisibleForTesting public static boolean tcpReadsDisabledForTesting = false; + private long hedgedReadOpsLoopNumForTesting = 0; private final DFSClient dfsClient; private boolean closed = false; private final String src; @@ -389,7 +393,7 @@ implements ByteBufferReadable, CanSetDro * Get block at the specified position. * Fetch it from the namenode if not cached. * - * @param offset + * @param offset block corresponding to this offset in file is returned * @param updatePosition whether to update current position * @return located block * @throws IOException @@ -453,14 +457,13 @@ implements ByteBufferReadable, CanSetDro * Get blocks in the specified range. * Fetch them from the namenode if not cached. This function * will not get a read request beyond the EOF. - * @param offset - * @param length + * @param offset starting offset in file + * @param length length of data * @return consequent segment of located blocks * @throws IOException */ - private synchronized List<LocatedBlock> getBlockRange(long offset, - long length) - throws IOException { + private synchronized List<LocatedBlock> getBlockRange(long offset, + long length) throws IOException { // getFileLength(): returns total file length // locatedBlocks.getFileLength(): returns length of completed blocks if (offset >= getFileLength()) { @@ -847,7 +850,6 @@ implements ByteBufferReadable, CanSetDro /** * Add corrupted block replica into map. - * @param corruptedBlockMap */ private void addIntoCorruptedBlockMap(ExtendedBlock blk, DatanodeInfo node, Map<ExtendedBlock, Set<DatanodeInfo>> corruptedBlockMap) { @@ -978,20 +980,15 @@ implements ByteBufferReadable, CanSetDro private Callable<ByteBuffer> getFromOneDataNode(final DNAddrPair datanode, final LocatedBlock block, final long start, final long end, final ByteBuffer bb, - final Map<ExtendedBlock, Set<DatanodeInfo>> corruptedBlockMap, - final CountDownLatch latch) { + final Map<ExtendedBlock, Set<DatanodeInfo>> corruptedBlockMap) { return new Callable<ByteBuffer>() { @Override public ByteBuffer call() throws Exception { - try { - byte[] buf = bb.array(); - int offset = bb.position(); - actualGetFromOneDataNode(datanode, block, start, end, buf, offset, - corruptedBlockMap); - return bb; - } finally { - latch.countDown(); - } + byte[] buf = bb.array(); + int offset = bb.position(); + actualGetFromOneDataNode(datanode, block, start, end, buf, offset, + corruptedBlockMap); + return bb; } }; } @@ -1020,6 +1017,7 @@ implements ByteBufferReadable, CanSetDro BlockReader reader = null; try { + DFSClientFaultInjector.get().fetchFromDatanodeException(); Token<BlockTokenIdentifier> blockToken = block.getBlockToken(); int len = (int) (end - start + 1); reader = new BlockReaderFactory(dfsClient.getConf()). @@ -1040,10 +1038,13 @@ implements ByteBufferReadable, CanSetDro setConfiguration(dfsClient.getConfiguration()). build(); int nread = reader.readAll(buf, offset, len); + updateReadStatistics(readStatistics, nread, reader); + if (nread != len) { throw new IOException("truncated return from reader.read(): " + "excpected " + len + ", got " + nread); } + DFSClientFaultInjector.get().readFromDatanodeDelay(); return; } catch (ChecksumException e) { String msg = "fetchBlockByteRange(). Got a checksum exception for " @@ -1091,49 +1092,49 @@ implements ByteBufferReadable, CanSetDro * int, Map)} except we start up a second, parallel, 'hedged' read * if the first read is taking longer than configured amount of * time. We then wait on which ever read returns first. - * - * @param block - * @param start - * @param end - * @param buf - * @param offset - * @param corruptedBlockMap - * @throws IOException */ private void hedgedFetchBlockByteRange(LocatedBlock block, long start, long end, byte[] buf, int offset, Map<ExtendedBlock, Set<DatanodeInfo>> corruptedBlockMap) throws IOException { ArrayList<Future<ByteBuffer>> futures = new ArrayList<Future<ByteBuffer>>(); + CompletionService<ByteBuffer> hedgedService = + new ExecutorCompletionService<ByteBuffer>( + dfsClient.getHedgedReadsThreadPool()); ArrayList<DatanodeInfo> ignored = new ArrayList<DatanodeInfo>(); ByteBuffer bb = null; int len = (int) (end - start + 1); block = getBlockAt(block.getStartOffset(), false); - // Latch shared by all outstanding reads. First to finish closes - CountDownLatch hasReceivedResult = new CountDownLatch(1); while (true) { + // see HDFS-6591, this metric is used to verify/catch unnecessary loops + hedgedReadOpsLoopNumForTesting++; DNAddrPair chosenNode = null; - Future<ByteBuffer> future = null; - // futures is null if there is no request already executing. + // there is no request already executing. if (futures.isEmpty()) { - // chooseDataNode is a commitment. If no node, we go to - // the NN to reget block locations. Only go here on first read. + // chooseDataNode is a commitment. If no node, we go to + // the NN to reget block locations. Only go here on first read. chosenNode = chooseDataNode(block, ignored); bb = ByteBuffer.wrap(buf, offset, len); - future = getHedgedReadFuture(chosenNode, block, start, end, bb, - corruptedBlockMap, hasReceivedResult); + Callable<ByteBuffer> getFromDataNodeCallable = getFromOneDataNode( + chosenNode, block, start, end, bb, corruptedBlockMap); + Future<ByteBuffer> firstRequest = hedgedService + .submit(getFromDataNodeCallable); + futures.add(firstRequest); try { - future.get(dfsClient.getHedgedReadTimeout(), TimeUnit.MILLISECONDS); - return; - } catch (TimeoutException e) { + Future<ByteBuffer> future = hedgedService.poll( + dfsClient.getHedgedReadTimeout(), TimeUnit.MILLISECONDS); + if (future != null) { + future.get(); + return; + } if (DFSClient.LOG.isDebugEnabled()) { - DFSClient.LOG.debug("Waited " + dfsClient.getHedgedReadTimeout() + - "ms to read from " + chosenNode.info + "; spawning hedged read"); + DFSClient.LOG.debug("Waited " + dfsClient.getHedgedReadTimeout() + + "ms to read from " + chosenNode.info + + "; spawning hedged read"); } // Ignore this node on next go around. ignored.add(chosenNode.info); dfsClient.getHedgedReadMetrics().incHedgedReadOps(); - futures.add(future); continue; // no need to refresh block locations } catch (InterruptedException e) { // Ignore @@ -1141,25 +1142,31 @@ implements ByteBufferReadable, CanSetDro // Ignore already logged in the call. } } else { - // We are starting up a 'hedged' read. We have a read already + // We are starting up a 'hedged' read. We have a read already // ongoing. Call getBestNodeDNAddrPair instead of chooseDataNode. // If no nodes to do hedged reads against, pass. try { - chosenNode = getBestNodeDNAddrPair(block.getLocations(), ignored); + try { + chosenNode = getBestNodeDNAddrPair(block.getLocations(), ignored); + } catch (IOException ioe) { + chosenNode = chooseDataNode(block, ignored); + } bb = ByteBuffer.allocate(len); - future = getHedgedReadFuture(chosenNode, block, start, end, bb, - corruptedBlockMap, hasReceivedResult); - futures.add(future); + Callable<ByteBuffer> getFromDataNodeCallable = getFromOneDataNode( + chosenNode, block, start, end, bb, corruptedBlockMap); + Future<ByteBuffer> oneMoreRequest = hedgedService + .submit(getFromDataNodeCallable); + futures.add(oneMoreRequest); } catch (IOException ioe) { if (DFSClient.LOG.isDebugEnabled()) { - DFSClient.LOG.debug("Failed getting node for hedged read: " + - ioe.getMessage()); + DFSClient.LOG.debug("Failed getting node for hedged read: " + + ioe.getMessage()); } } // if not succeeded. Submit callables for each datanode in a loop, wait // for a fixed interval and get the result from the fastest one. try { - ByteBuffer result = getFirstToComplete(futures, hasReceivedResult); + ByteBuffer result = getFirstToComplete(hedgedService, futures); // cancel the rest. cancelAll(futures); if (result.array() != buf) { // compare the array pointers @@ -1171,50 +1178,43 @@ implements ByteBufferReadable, CanSetDro } return; } catch (InterruptedException ie) { - // Ignore - } catch (ExecutionException e) { - // exception already handled in the call method. getFirstToComplete - // will remove the failing future from the list. nothing more to do. + // Ignore and retry } - // We got here if exception. Ignore this node on next go around IFF + // We got here if exception. Ignore this node on next go around IFF // we found a chosenNode to hedge read against. if (chosenNode != null && chosenNode.info != null) { ignored.add(chosenNode.info); } } - // executed if we get an error from a data node - block = getBlockAt(block.getStartOffset(), false); } } - private Future<ByteBuffer> getHedgedReadFuture(final DNAddrPair chosenNode, - final LocatedBlock block, long start, - final long end, final ByteBuffer bb, - final Map<ExtendedBlock, Set<DatanodeInfo>> corruptedBlockMap, - final CountDownLatch hasReceivedResult) { - Callable<ByteBuffer> getFromDataNodeCallable = - getFromOneDataNode(chosenNode, block, start, end, bb, - corruptedBlockMap, hasReceivedResult); - return dfsClient.getHedgedReadsThreadPool().submit(getFromDataNodeCallable); + @VisibleForTesting + public long getHedgedReadOpsLoopNumForTesting() { + return hedgedReadOpsLoopNumForTesting; } - private ByteBuffer getFirstToComplete(ArrayList<Future<ByteBuffer>> futures, - CountDownLatch latch) throws ExecutionException, InterruptedException { - latch.await(); - for (Future<ByteBuffer> future : futures) { - if (future.isDone()) { - try { - return future.get(); - } catch (ExecutionException e) { - // already logged in the Callable - futures.remove(future); - throw e; - } - } + private ByteBuffer getFirstToComplete( + CompletionService<ByteBuffer> hedgedService, + ArrayList<Future<ByteBuffer>> futures) throws InterruptedException { + if (futures.isEmpty()) { + throw new InterruptedException("let's retry"); } - throw new InterruptedException("latch has counted down to zero but no" - + "result available yet, for safety try to request another one from" - + "outside loop, this should be rare"); + Future<ByteBuffer> future = null; + try { + future = hedgedService.take(); + ByteBuffer bb = future.get(); + futures.remove(future); + return bb; + } catch (ExecutionException e) { + // already logged in the Callable + futures.remove(future); + } catch (CancellationException ce) { + // already logged in the Callable + futures.remove(future); + } + + throw new InterruptedException("let's retry"); } private void cancelAll(List<Future<ByteBuffer>> futures) { @@ -1375,10 +1375,10 @@ implements ByteBufferReadable, CanSetDro @Override public synchronized void seek(long targetPos) throws IOException { if (targetPos > getFileLength()) { - throw new IOException("Cannot seek after EOF"); + throw new EOFException("Cannot seek after EOF"); } if (targetPos < 0) { - throw new IOException("Cannot seek to negative offset"); + throw new EOFException("Cannot seek to negative offset"); } if (closed) { throw new IOException("Stream is closed!"); Modified: hadoop/common/branches/HADOOP-10388/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSOutputStream.java URL: http://svn.apache.org/viewvc/hadoop/common/branches/HADOOP-10388/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSOutputStream.java?rev=1619012&r1=1619011&r2=1619012&view=diff ============================================================================== --- hadoop/common/branches/HADOOP-10388/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSOutputStream.java (original) +++ hadoop/common/branches/HADOOP-10388/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSOutputStream.java Tue Aug 19 23:49:39 2014 @@ -61,7 +61,6 @@ import org.apache.hadoop.hdfs.protocol.N import org.apache.hadoop.hdfs.protocol.SnapshotAccessControlException; import org.apache.hadoop.hdfs.protocol.UnresolvedPathException; import org.apache.hadoop.hdfs.protocol.datatransfer.BlockConstructionStage; -import org.apache.hadoop.hdfs.protocol.datatransfer.DataTransferEncryptor; import org.apache.hadoop.hdfs.protocol.datatransfer.DataTransferProtocol; import org.apache.hadoop.hdfs.protocol.datatransfer.IOStreamPair; import org.apache.hadoop.hdfs.protocol.datatransfer.InvalidEncryptionKeyException; @@ -122,6 +121,7 @@ public class DFSOutputStream extends FSO implements Syncable, CanSetDropBehind { private static final int MAX_PACKETS = 80; // each packet 64K, total 5MB private final DFSClient dfsClient; + private final long dfsclientSlowLogThresholdMs; private Socket s; // closed is accessed by different threads under different locks. private volatile boolean closed = false; @@ -313,6 +313,7 @@ public class DFSOutputStream extends FSO private DataInputStream blockReplyStream; private ResponseProcessor response = null; private volatile DatanodeInfo[] nodes = null; // list of targets for current block + private volatile StorageType[] storageTypes = null; private volatile String[] storageIDs = null; private final LoadingCache<DatanodeInfo, DatanodeInfo> excludedNodes = CacheBuilder.newBuilder() @@ -417,10 +418,12 @@ public class DFSOutputStream extends FSO } private void setPipeline(LocatedBlock lb) { - setPipeline(lb.getLocations(), lb.getStorageIDs()); + setPipeline(lb.getLocations(), lb.getStorageTypes(), lb.getStorageIDs()); } - private void setPipeline(DatanodeInfo[] nodes, String[] storageIDs) { + private void setPipeline(DatanodeInfo[] nodes, StorageType[] storageTypes, + String[] storageIDs) { this.nodes = nodes; + this.storageTypes = storageTypes; this.storageIDs = storageIDs; } @@ -446,7 +449,7 @@ public class DFSOutputStream extends FSO this.setName("DataStreamer for file " + src); closeResponder(); closeStream(); - setPipeline(null, null); + setPipeline(null, null, null); stage = BlockConstructionStage.PIPELINE_SETUP_CREATE; } @@ -788,11 +791,19 @@ public class DFSOutputStream extends FSO // process responses from datanodes. try { // read an ack from the pipeline + long begin = Time.monotonicNow(); ack.readFields(blockReplyStream); - if (DFSClient.LOG.isDebugEnabled()) { + long duration = Time.monotonicNow() - begin; + if (duration > dfsclientSlowLogThresholdMs + && ack.getSeqno() != Packet.HEART_BEAT_SEQNO) { + DFSClient.LOG + .warn("Slow ReadProcessor read fields took " + duration + + "ms (threshold=" + dfsclientSlowLogThresholdMs + "ms); ack: " + + ack + ", targets: " + Arrays.asList(targets)); + } else if (DFSClient.LOG.isDebugEnabled()) { DFSClient.LOG.debug("DFSClient " + ack); } - + long seqno = ack.getSeqno(); // processes response status from datanodes. for (int i = ack.getNumOfReplies()-1; i >=0 && dfsClient.clientRunning; i--) { @@ -1012,7 +1023,7 @@ public class DFSOutputStream extends FSO //get a new datanode final DatanodeInfo[] original = nodes; final LocatedBlock lb = dfsClient.namenode.getAdditionalDatanode( - src, block, nodes, storageIDs, + src, fileId, block, nodes, storageIDs, failed.toArray(new DatanodeInfo[failed.size()]), 1, dfsClient.clientName); setPipeline(lb); @@ -1023,10 +1034,12 @@ public class DFSOutputStream extends FSO //transfer replica final DatanodeInfo src = d == 0? nodes[1]: nodes[d - 1]; final DatanodeInfo[] targets = {nodes[d]}; - transfer(src, targets, lb.getBlockToken()); + final StorageType[] targetStorageTypes = {storageTypes[d]}; + transfer(src, targets, targetStorageTypes, lb.getBlockToken()); } private void transfer(final DatanodeInfo src, final DatanodeInfo[] targets, + final StorageType[] targetStorageTypes, final Token<BlockTokenIdentifier> blockToken) throws IOException { //transfer replica to the new datanode Socket sock = null; @@ -1038,21 +1051,17 @@ public class DFSOutputStream extends FSO OutputStream unbufOut = NetUtils.getOutputStream(sock, writeTimeout); InputStream unbufIn = NetUtils.getInputStream(sock); - if (dfsClient.shouldEncryptData() && - !dfsClient.trustedChannelResolver.isTrusted(sock.getInetAddress())) { - IOStreamPair encryptedStreams = - DataTransferEncryptor.getEncryptedStreams( - unbufOut, unbufIn, dfsClient.getDataEncryptionKey()); - unbufOut = encryptedStreams.out; - unbufIn = encryptedStreams.in; - } + IOStreamPair saslStreams = dfsClient.saslClient.socketSend(sock, + unbufOut, unbufIn, dfsClient, blockToken, src); + unbufOut = saslStreams.out; + unbufIn = saslStreams.in; out = new DataOutputStream(new BufferedOutputStream(unbufOut, HdfsConstants.SMALL_BUFFER_SIZE)); in = new DataInputStream(unbufIn); //send the TRANSFER_BLOCK request new Sender(out).transferBlock(block, blockToken, dfsClient.clientName, - targets); + targets, targetStorageTypes); out.flush(); //ack @@ -1131,16 +1140,15 @@ public class DFSOutputStream extends FSO failed.add(nodes[errorIndex]); DatanodeInfo[] newnodes = new DatanodeInfo[nodes.length-1]; - System.arraycopy(nodes, 0, newnodes, 0, errorIndex); - System.arraycopy(nodes, errorIndex+1, newnodes, errorIndex, - newnodes.length-errorIndex); + arraycopy(nodes, newnodes, errorIndex); + + final StorageType[] newStorageTypes = new StorageType[newnodes.length]; + arraycopy(storageTypes, newStorageTypes, errorIndex); final String[] newStorageIDs = new String[newnodes.length]; - System.arraycopy(storageIDs, 0, newStorageIDs, 0, errorIndex); - System.arraycopy(storageIDs, errorIndex+1, newStorageIDs, errorIndex, - newStorageIDs.length-errorIndex); + arraycopy(storageIDs, newStorageIDs, errorIndex); - setPipeline(newnodes, newStorageIDs); + setPipeline(newnodes, newStorageTypes, newStorageIDs); // Just took care of a node error while waiting for a node restart if (restartingNodeIndex >= 0) { @@ -1177,7 +1185,7 @@ public class DFSOutputStream extends FSO // set up the pipeline again with the remaining nodes if (failPacket) { // for testing - success = createBlockOutputStream(nodes, newGS, isRecovery); + success = createBlockOutputStream(nodes, storageTypes, newGS, isRecovery); failPacket = false; try { // Give DNs time to send in bad reports. In real situations, @@ -1186,7 +1194,7 @@ public class DFSOutputStream extends FSO Thread.sleep(2000); } catch (InterruptedException ie) {} } else { - success = createBlockOutputStream(nodes, newGS, isRecovery); + success = createBlockOutputStream(nodes, storageTypes, newGS, isRecovery); } if (restartingNodeIndex >= 0) { @@ -1238,6 +1246,7 @@ public class DFSOutputStream extends FSO private LocatedBlock nextBlockOutputStream() throws IOException { LocatedBlock lb = null; DatanodeInfo[] nodes = null; + StorageType[] storageTypes = null; int count = dfsClient.getConf().nBlockWriteRetry; boolean success = false; ExtendedBlock oldBlock = block; @@ -1260,15 +1269,17 @@ public class DFSOutputStream extends FSO bytesSent = 0; accessToken = lb.getBlockToken(); nodes = lb.getLocations(); + storageTypes = lb.getStorageTypes(); // // Connect to first DataNode in the list. // - success = createBlockOutputStream(nodes, 0L, false); + success = createBlockOutputStream(nodes, storageTypes, 0L, false); if (!success) { DFSClient.LOG.info("Abandoning " + block); - dfsClient.namenode.abandonBlock(block, src, dfsClient.clientName); + dfsClient.namenode.abandonBlock(block, fileId, src, + dfsClient.clientName); block = null; DFSClient.LOG.info("Excluding datanode " + nodes[errorIndex]); excludedNodes.put(nodes[errorIndex], nodes[errorIndex]); @@ -1284,8 +1295,8 @@ public class DFSOutputStream extends FSO // connects to the first datanode in the pipeline // Returns true if success, otherwise return failure. // - private boolean createBlockOutputStream(DatanodeInfo[] nodes, long newGS, - boolean recoveryFlag) { + private boolean createBlockOutputStream(DatanodeInfo[] nodes, + StorageType[] nodeStorageTypes, long newGS, boolean recoveryFlag) { if (nodes.length == 0) { DFSClient.LOG.info("nodes are empty for write pipeline of block " + block); @@ -1315,14 +1326,10 @@ public class DFSOutputStream extends FSO OutputStream unbufOut = NetUtils.getOutputStream(s, writeTimeout); InputStream unbufIn = NetUtils.getInputStream(s); - if (dfsClient.shouldEncryptData() && - !dfsClient.trustedChannelResolver.isTrusted(s.getInetAddress())) { - IOStreamPair encryptedStreams = - DataTransferEncryptor.getEncryptedStreams(unbufOut, - unbufIn, dfsClient.getDataEncryptionKey()); - unbufOut = encryptedStreams.out; - unbufIn = encryptedStreams.in; - } + IOStreamPair saslStreams = dfsClient.saslClient.socketSend(s, + unbufOut, unbufIn, dfsClient, accessToken, nodes[0]); + unbufOut = saslStreams.out; + unbufIn = saslStreams.in; out = new DataOutputStream(new BufferedOutputStream(unbufOut, HdfsConstants.SMALL_BUFFER_SIZE)); blockReplyStream = new DataInputStream(unbufIn); @@ -1331,9 +1338,10 @@ public class DFSOutputStream extends FSO // Xmit header info to datanode // + BlockConstructionStage bcs = recoveryFlag? stage.getRecoveryStage(): stage; // send the request - new Sender(out).writeBlock(block, accessToken, dfsClient.clientName, - nodes, null, recoveryFlag? stage.getRecoveryStage() : stage, + new Sender(out).writeBlock(block, nodeStorageTypes[0], accessToken, + dfsClient.clientName, nodes, nodeStorageTypes, null, bcs, nodes.length, block.getNumBytes(), bytesSent, newGS, checksum, cachingStrategy.get()); @@ -1569,6 +1577,8 @@ public class DFSOutputStream extends FSO } this.checksum = checksum; + this.dfsclientSlowLogThresholdMs = + dfsClient.getConf().dfsclientSlowIoWarningThresholdMs; } /** Construct a new output stream for creating a file. */ @@ -1914,7 +1924,8 @@ public class DFSOutputStream extends FSO // namenode. if (persistBlocks.getAndSet(false) || updateLength) { try { - dfsClient.namenode.fsync(src, dfsClient.clientName, lastBlockLength); + dfsClient.namenode.fsync(src, fileId, + dfsClient.clientName, lastBlockLength); } catch (IOException ioe) { DFSClient.LOG.warn("Unable to persist blocks in hflush for " + src, ioe); // If we got an error here, it might be because some other thread called @@ -1999,6 +2010,7 @@ public class DFSOutputStream extends FSO if (DFSClient.LOG.isDebugEnabled()) { DFSClient.LOG.debug("Waiting for ack for: " + seqno); } + long begin = Time.monotonicNow(); try { synchronized (dataQueue) { while (!closed) { @@ -2018,6 +2030,11 @@ public class DFSOutputStream extends FSO checkClosed(); } catch (ClosedChannelException e) { } + long duration = Time.monotonicNow() - begin; + if (duration > dfsclientSlowLogThresholdMs) { + DFSClient.LOG.warn("Slow waitForAckedSeqno took " + duration + + "ms (threshold=" + dfsclientSlowLogThresholdMs + "ms)"); + } } private synchronized void start() { @@ -2035,7 +2052,7 @@ public class DFSOutputStream extends FSO streamer.setLastException(new IOException("Lease timeout of " + (dfsClient.getHdfsTimeout()/1000) + " seconds expired.")); closeThreads(true); - dfsClient.endFileLease(src); + dfsClient.endFileLease(fileId); } // shutdown datastreamer and responseprocessor threads. @@ -2090,7 +2107,7 @@ public class DFSOutputStream extends FSO ExtendedBlock lastBlock = streamer.getBlock(); closeThreads(false); completeFile(lastBlock); - dfsClient.endFileLease(src); + dfsClient.endFileLease(fileId); } catch (ClosedChannelException e) { } finally { closed = true; @@ -2119,12 +2136,12 @@ public class DFSOutputStream extends FSO throw new IOException(msg); } try { - Thread.sleep(localTimeout); if (retries == 0) { throw new IOException("Unable to close file because the last block" + " does not have enough number of replicas."); } retries--; + Thread.sleep(localTimeout); localTimeout *= 2; if (Time.now() - localstart > 5000) { DFSClient.LOG.info("Could not complete " + src + " retrying..."); @@ -2184,7 +2201,12 @@ public class DFSOutputStream extends FSO } @VisibleForTesting - long getFileId() { + public long getFileId() { return fileId; } + + private static <T> void arraycopy(T[] srcs, T[] dsts, int skipIndex) { + System.arraycopy(srcs, 0, dsts, 0, skipIndex); + System.arraycopy(srcs, skipIndex+1, dsts, skipIndex, dsts.length-skipIndex); + } } Modified: hadoop/common/branches/HADOOP-10388/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSUtil.java URL: http://svn.apache.org/viewvc/hadoop/common/branches/HADOOP-10388/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSUtil.java?rev=1619012&r1=1619011&r2=1619012&view=diff ============================================================================== --- hadoop/common/branches/HADOOP-10388/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSUtil.java (original) +++ hadoop/common/branches/HADOOP-10388/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSUtil.java Tue Aug 19 23:49:39 2014 @@ -33,6 +33,9 @@ import static org.apache.hadoop.hdfs.DFS import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_NAMENODE_SERVICE_RPC_ADDRESS_KEY; import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_NAMESERVICES; import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_NAMESERVICE_ID; +import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_SERVER_HTTPS_KEYPASSWORD_KEY; +import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_SERVER_HTTPS_KEYSTORE_PASSWORD_KEY; +import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_SERVER_HTTPS_TRUSTSTORE_PASSWORD_KEY; import java.io.IOException; import java.io.PrintStream; @@ -288,9 +291,7 @@ public class DFSUtil { * <p> * Note that some components are only reserved under certain directories, e.g. * "/.reserved" is reserved, while "/hadoop/.reserved" is not. - * - * @param component - * @return if the component is reserved + * @return true, if the component is reserved */ public static boolean isReservedPathComponent(String component) { for (String reserved : HdfsConstants.RESERVED_PATH_COMPONENTS) { @@ -1015,8 +1016,8 @@ public class DFSUtil { /** * return server http or https address from the configuration for a * given namenode rpc address. - * @param conf * @param namenodeAddr - namenode RPC address + * @param conf configuration * @param scheme - the scheme (http / https) * @return server http or https address * @throws IOException @@ -1327,7 +1328,7 @@ public class DFSUtil { /** * For given set of {@code keys} adds nameservice Id and or namenode Id * and returns {nameserviceId, namenodeId} when address match is found. - * @see #getSuffixIDs(Configuration, String, AddressMatcher) + * @see #getSuffixIDs(Configuration, String, String, String, AddressMatcher) */ static String[] getSuffixIDs(final Configuration conf, final InetSocketAddress address, final String... keys) { @@ -1499,9 +1500,8 @@ public class DFSUtil { /** * Get SPNEGO keytab Key from configuration * - * @param conf - * Configuration - * @param defaultKey + * @param conf Configuration + * @param defaultKey default key to be used for config lookup * @return DFS_WEB_AUTHENTICATION_KERBEROS_KEYTAB_KEY if the key is not empty * else return defaultKey */ @@ -1534,16 +1534,38 @@ public class DFSUtil { .needsClientAuth( sslConf.getBoolean(DFS_CLIENT_HTTPS_NEED_AUTH_KEY, DFS_CLIENT_HTTPS_NEED_AUTH_DEFAULT)) - .keyPassword(sslConf.get("ssl.server.keystore.keypassword")) + .keyPassword(getPassword(sslConf, DFS_SERVER_HTTPS_KEYPASSWORD_KEY)) .keyStore(sslConf.get("ssl.server.keystore.location"), - sslConf.get("ssl.server.keystore.password"), + getPassword(sslConf, DFS_SERVER_HTTPS_KEYSTORE_PASSWORD_KEY), sslConf.get("ssl.server.keystore.type", "jks")) .trustStore(sslConf.get("ssl.server.truststore.location"), - sslConf.get("ssl.server.truststore.password"), + getPassword(sslConf, DFS_SERVER_HTTPS_TRUSTSTORE_PASSWORD_KEY), sslConf.get("ssl.server.truststore.type", "jks")); } /** + * Leverages the Configuration.getPassword method to attempt to get + * passwords from the CredentialProvider API before falling back to + * clear text in config - if falling back is allowed. + * @param conf Configuration instance + * @param alias name of the credential to retreive + * @return String credential value or null + */ + static String getPassword(Configuration conf, String alias) { + String password = null; + try { + char[] passchars = conf.getPassword(alias); + if (passchars != null) { + password = new String(passchars); + } + } + catch (IOException ioe) { + password = null; + } + return password; + } + + /** * Converts a Date into an ISO-8601 formatted datetime string. */ public static String dateToIso8601String(Date date) { @@ -1646,9 +1668,11 @@ public class DFSUtil { .setKeytabConfKey(getSpnegoKeytabKey(conf, spnegoKeytabFileKey)); // initialize the webserver for uploading/downloading files. - LOG.info("Starting web server as: " - + SecurityUtil.getServerPrincipal(conf.get(spnegoUserNameKey), - httpAddr.getHostName())); + if (UserGroupInformation.isSecurityEnabled()) { + LOG.info("Starting web server as: " + + SecurityUtil.getServerPrincipal(conf.get(spnegoUserNameKey), + httpAddr.getHostName())); + } if (policy.isHttpEnabled()) { if (httpAddr.getPort() == 0) { Modified: hadoop/common/branches/HADOOP-10388/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DistributedFileSystem.java URL: http://svn.apache.org/viewvc/hadoop/common/branches/HADOOP-10388/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DistributedFileSystem.java?rev=1619012&r1=1619011&r2=1619012&view=diff ============================================================================== --- hadoop/common/branches/HADOOP-10388/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DistributedFileSystem.java (original) +++ hadoop/common/branches/HADOOP-10388/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DistributedFileSystem.java Tue Aug 19 23:49:39 2014 @@ -25,6 +25,7 @@ import java.net.URI; import java.util.ArrayList; import java.util.EnumSet; import java.util.List; +import java.util.Map; import org.apache.hadoop.classification.InterfaceAudience; import org.apache.hadoop.classification.InterfaceStability; @@ -46,6 +47,7 @@ import org.apache.hadoop.fs.FsServerDefa import org.apache.hadoop.fs.FsStatus; import org.apache.hadoop.fs.LocatedFileStatus; import org.apache.hadoop.fs.Options; +import org.apache.hadoop.fs.XAttrSetFlag; import org.apache.hadoop.fs.Options.ChecksumOpt; import org.apache.hadoop.fs.ParentNotDirectoryException; import org.apache.hadoop.fs.Path; @@ -57,6 +59,7 @@ import org.apache.hadoop.fs.VolumeId; import org.apache.hadoop.fs.permission.AclEntry; import org.apache.hadoop.fs.permission.AclStatus; import org.apache.hadoop.fs.permission.FsPermission; +import org.apache.hadoop.fs.permission.FsAction; import org.apache.hadoop.hdfs.client.HdfsAdmin; import org.apache.hadoop.hdfs.client.HdfsDataInputStream; import org.apache.hadoop.hdfs.client.HdfsDataOutputStream; @@ -66,14 +69,12 @@ import org.apache.hadoop.hdfs.protocol.C import org.apache.hadoop.hdfs.protocol.CachePoolInfo; import org.apache.hadoop.hdfs.protocol.DatanodeInfo; import org.apache.hadoop.hdfs.protocol.DirectoryListing; -import org.apache.hadoop.hdfs.protocol.ExtendedBlock; import org.apache.hadoop.hdfs.protocol.HdfsConstants; import org.apache.hadoop.hdfs.protocol.HdfsConstants.DatanodeReportType; import org.apache.hadoop.hdfs.protocol.HdfsConstants.RollingUpgradeAction; import org.apache.hadoop.hdfs.protocol.HdfsConstants.SafeModeAction; import org.apache.hadoop.hdfs.protocol.HdfsFileStatus; import org.apache.hadoop.hdfs.protocol.HdfsLocatedFileStatus; -import org.apache.hadoop.hdfs.protocol.LocatedBlock; import org.apache.hadoop.hdfs.protocol.RollingUpgradeInfo; import org.apache.hadoop.hdfs.protocol.SnapshotDiffReport; import org.apache.hadoop.hdfs.protocol.SnapshottableDirectoryStatus; @@ -83,7 +84,6 @@ import org.apache.hadoop.hdfs.server.nam import org.apache.hadoop.io.Text; import org.apache.hadoop.net.NetUtils; import org.apache.hadoop.security.AccessControlException; -import org.apache.hadoop.security.token.SecretManager.InvalidToken; import org.apache.hadoop.security.token.Token; import org.apache.hadoop.util.Progressable; @@ -1140,7 +1140,7 @@ public class DistributedFileSystem exten @Override public FileChecksum doCall(final Path p) throws IOException, UnresolvedLinkException { - return dfs.getFileChecksum(getPathName(p)); + return dfs.getFileChecksum(getPathName(p), Long.MAX_VALUE); } @Override @@ -1152,6 +1152,32 @@ public class DistributedFileSystem exten } @Override + public FileChecksum getFileChecksum(Path f, final long length) + throws IOException { + statistics.incrementReadOps(1); + Path absF = fixRelativePart(f); + return new FileSystemLinkResolver<FileChecksum>() { + @Override + public FileChecksum doCall(final Path p) + throws IOException, UnresolvedLinkException { + return dfs.getFileChecksum(getPathName(p), length); + } + + @Override + public FileChecksum next(final FileSystem fs, final Path p) + throws IOException { + if (fs instanceof DistributedFileSystem) { + return ((DistributedFileSystem) fs).getFileChecksum(p, length); + } else { + throw new UnsupportedFileSystemException( + "getFileChecksum(Path, long) is not supported by " + + fs.getClass().getSimpleName()); + } + } + }.resolve(this, absF); + } + + @Override public void setPermission(Path p, final FsPermission permission ) throws IOException { statistics.incrementWriteOps(1); @@ -1429,7 +1455,7 @@ public class DistributedFileSystem exten * Get the difference between two snapshots, or between a snapshot and the * current tree of a directory. * - * @see DFSClient#getSnapshotDiffReport(Path, String, String) + * @see DFSClient#getSnapshotDiffReport(String, String, String) */ public SnapshotDiffReport getSnapshotDiffReport(final Path snapshotDir, final String fromSnapshot, final String toSnapshot) throws IOException { @@ -1769,4 +1795,127 @@ public class DistributedFileSystem exten } }.resolve(this, absF); } + + @Override + public void setXAttr(Path path, final String name, final byte[] value, + final EnumSet<XAttrSetFlag> flag) throws IOException { + Path absF = fixRelativePart(path); + new FileSystemLinkResolver<Void>() { + + @Override + public Void doCall(final Path p) throws IOException { + dfs.setXAttr(getPathName(p), name, value, flag); + return null; + } + + @Override + public Void next(final FileSystem fs, final Path p) throws IOException { + fs.setXAttr(p, name, value, flag); + return null; + } + }.resolve(this, absF); + } + + @Override + public byte[] getXAttr(Path path, final String name) throws IOException { + final Path absF = fixRelativePart(path); + return new FileSystemLinkResolver<byte[]>() { + @Override + public byte[] doCall(final Path p) throws IOException { + return dfs.getXAttr(getPathName(p), name); + } + @Override + public byte[] next(final FileSystem fs, final Path p) + throws IOException, UnresolvedLinkException { + return fs.getXAttr(p, name); + } + }.resolve(this, absF); + } + + @Override + public Map<String, byte[]> getXAttrs(Path path) throws IOException { + final Path absF = fixRelativePart(path); + return new FileSystemLinkResolver<Map<String, byte[]>>() { + @Override + public Map<String, byte[]> doCall(final Path p) throws IOException { + return dfs.getXAttrs(getPathName(p)); + } + @Override + public Map<String, byte[]> next(final FileSystem fs, final Path p) + throws IOException, UnresolvedLinkException { + return fs.getXAttrs(p); + } + }.resolve(this, absF); + } + + @Override + public Map<String, byte[]> getXAttrs(Path path, final List<String> names) + throws IOException { + final Path absF = fixRelativePart(path); + return new FileSystemLinkResolver<Map<String, byte[]>>() { + @Override + public Map<String, byte[]> doCall(final Path p) throws IOException { + return dfs.getXAttrs(getPathName(p), names); + } + @Override + public Map<String, byte[]> next(final FileSystem fs, final Path p) + throws IOException, UnresolvedLinkException { + return fs.getXAttrs(p, names); + } + }.resolve(this, absF); + } + + @Override + public List<String> listXAttrs(Path path) + throws IOException { + final Path absF = fixRelativePart(path); + return new FileSystemLinkResolver<List<String>>() { + @Override + public List<String> doCall(final Path p) throws IOException { + return dfs.listXAttrs(getPathName(p)); + } + @Override + public List<String> next(final FileSystem fs, final Path p) + throws IOException, UnresolvedLinkException { + return fs.listXAttrs(p); + } + }.resolve(this, absF); + } + + @Override + public void removeXAttr(Path path, final String name) throws IOException { + Path absF = fixRelativePart(path); + new FileSystemLinkResolver<Void>() { + @Override + public Void doCall(final Path p) throws IOException { + dfs.removeXAttr(getPathName(p), name); + return null; + } + + @Override + public Void next(final FileSystem fs, final Path p) throws IOException { + fs.removeXAttr(p, name); + return null; + } + }.resolve(this, absF); + } + + @Override + public void access(Path path, final FsAction mode) throws IOException { + final Path absF = fixRelativePart(path); + new FileSystemLinkResolver<Void>() { + @Override + public Void doCall(final Path p) throws IOException { + dfs.checkAccess(getPathName(p), mode); + return null; + } + + @Override + public Void next(final FileSystem fs, final Path p) + throws IOException { + fs.access(p, mode); + return null; + } + }.resolve(this, absF); + } }