Modified: hadoop/common/branches/YARN-1051/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataNode.java URL: http://svn.apache.org/viewvc/hadoop/common/branches/YARN-1051/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataNode.java?rev=1613514&r1=1613513&r2=1613514&view=diff ============================================================================== --- hadoop/common/branches/YARN-1051/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataNode.java (original) +++ hadoop/common/branches/YARN-1051/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataNode.java Fri Jul 25 20:33:09 2014 @@ -17,10 +17,68 @@ */ package org.apache.hadoop.hdfs.server.datanode; -import com.google.common.annotations.VisibleForTesting; -import com.google.common.base.Joiner; -import com.google.common.base.Preconditions; -import com.google.protobuf.BlockingService; +import static org.apache.hadoop.fs.CommonConfigurationKeys.IPC_CLIENT_FALLBACK_TO_SIMPLE_AUTH_ALLOWED_DEFAULT; +import static org.apache.hadoop.fs.CommonConfigurationKeys.IPC_CLIENT_FALLBACK_TO_SIMPLE_AUTH_ALLOWED_KEY; +import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_ADMIN; +import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_DATANODE_ADDRESS_DEFAULT; +import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_DATANODE_ADDRESS_KEY; +import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_DATANODE_DATA_DIR_KEY; +import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_DATANODE_DATA_DIR_PERMISSION_DEFAULT; +import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_DATANODE_DATA_DIR_PERMISSION_KEY; +import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_DATANODE_DIRECTORYSCAN_INTERVAL_DEFAULT; +import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_DATANODE_DIRECTORYSCAN_INTERVAL_KEY; +import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_DATANODE_DNS_INTERFACE_DEFAULT; +import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_DATANODE_DNS_INTERFACE_KEY; +import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_DATANODE_DNS_NAMESERVER_DEFAULT; +import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_DATANODE_DNS_NAMESERVER_KEY; +import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_DATANODE_HANDLER_COUNT_DEFAULT; +import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_DATANODE_HANDLER_COUNT_KEY; +import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_DATANODE_HOST_NAME_KEY; +import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_DATANODE_HTTPS_ADDRESS_DEFAULT; +import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_DATANODE_HTTPS_ADDRESS_KEY; +import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_DATANODE_HTTP_ADDRESS_DEFAULT; +import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_DATANODE_HTTP_ADDRESS_KEY; +import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_DATANODE_IPC_ADDRESS_KEY; +import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_DATANODE_KERBEROS_PRINCIPAL_KEY; +import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_DATANODE_KEYTAB_FILE_KEY; +import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_DATANODE_MAX_LOCKED_MEMORY_KEY; +import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_DATANODE_PLUGINS_KEY; +import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_DATANODE_SCAN_PERIOD_HOURS_DEFAULT; +import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_DATANODE_SCAN_PERIOD_HOURS_KEY; +import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_DATANODE_STARTUP_KEY; +import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_DATA_TRANSFER_PROTECTION_KEY; +import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_MAX_NUM_BLOCKS_TO_LOG_DEFAULT; +import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_MAX_NUM_BLOCKS_TO_LOG_KEY; +import static org.apache.hadoop.util.ExitUtil.terminate; + +import java.io.BufferedOutputStream; +import java.io.ByteArrayInputStream; +import java.io.DataInputStream; +import java.io.DataOutputStream; +import java.io.FileInputStream; +import java.io.IOException; +import java.io.InputStream; +import java.io.OutputStream; +import java.io.PrintStream; +import java.lang.management.ManagementFactory; +import java.net.InetSocketAddress; +import java.net.Socket; +import java.net.URI; +import java.net.UnknownHostException; +import java.nio.channels.SocketChannel; +import java.security.PrivilegedExceptionAction; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.Collection; +import java.util.EnumSet; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.Set; +import java.util.UUID; +import java.util.concurrent.atomic.AtomicInteger; + +import javax.management.ObjectName; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; @@ -36,17 +94,44 @@ import org.apache.hadoop.hdfs.DFSConfigK import org.apache.hadoop.hdfs.DFSUtil; import org.apache.hadoop.hdfs.HDFSPolicyProvider; import org.apache.hadoop.hdfs.HdfsConfiguration; +import org.apache.hadoop.hdfs.StorageType; import org.apache.hadoop.hdfs.net.DomainPeerServer; import org.apache.hadoop.hdfs.net.TcpPeerServer; -import org.apache.hadoop.hdfs.protocol.*; -import org.apache.hadoop.hdfs.protocol.datatransfer.*; +import org.apache.hadoop.hdfs.protocol.Block; +import org.apache.hadoop.hdfs.protocol.BlockLocalPathInfo; +import org.apache.hadoop.hdfs.protocol.ClientDatanodeProtocol; +import org.apache.hadoop.hdfs.protocol.DatanodeID; +import org.apache.hadoop.hdfs.protocol.DatanodeInfo; +import org.apache.hadoop.hdfs.protocol.DatanodeLocalInfo; +import org.apache.hadoop.hdfs.protocol.ExtendedBlock; +import org.apache.hadoop.hdfs.protocol.HdfsBlocksMetadata; +import org.apache.hadoop.hdfs.protocol.HdfsConstants; +import org.apache.hadoop.hdfs.protocol.RecoveryInProgressException; +import org.apache.hadoop.hdfs.protocol.datatransfer.BlockConstructionStage; +import org.apache.hadoop.hdfs.protocol.datatransfer.DataTransferProtocol; +import org.apache.hadoop.hdfs.protocol.datatransfer.IOStreamPair; +import org.apache.hadoop.hdfs.protocol.datatransfer.Sender; +import org.apache.hadoop.hdfs.protocol.datatransfer.sasl.DataEncryptionKeyFactory; +import org.apache.hadoop.hdfs.protocol.datatransfer.sasl.SaslDataTransferClient; +import org.apache.hadoop.hdfs.protocol.datatransfer.sasl.SaslDataTransferServer; import org.apache.hadoop.hdfs.protocol.proto.ClientDatanodeProtocolProtos.ClientDatanodeProtocolService; import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.DNTransferAckProto; import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.Status; import org.apache.hadoop.hdfs.protocol.proto.InterDatanodeProtocolProtos.InterDatanodeProtocolService; -import org.apache.hadoop.hdfs.protocolPB.*; -import org.apache.hadoop.hdfs.security.token.block.*; +import org.apache.hadoop.hdfs.protocolPB.ClientDatanodeProtocolPB; +import org.apache.hadoop.hdfs.protocolPB.ClientDatanodeProtocolServerSideTranslatorPB; +import org.apache.hadoop.hdfs.protocolPB.DatanodeProtocolClientSideTranslatorPB; +import org.apache.hadoop.hdfs.protocolPB.InterDatanodeProtocolPB; +import org.apache.hadoop.hdfs.protocolPB.InterDatanodeProtocolServerSideTranslatorPB; +import org.apache.hadoop.hdfs.protocolPB.InterDatanodeProtocolTranslatorPB; +import org.apache.hadoop.hdfs.protocolPB.PBHelper; +import org.apache.hadoop.hdfs.security.token.block.BlockPoolTokenSecretManager; +import org.apache.hadoop.hdfs.security.token.block.BlockTokenIdentifier; +import org.apache.hadoop.hdfs.security.token.block.BlockTokenSecretManager; import org.apache.hadoop.hdfs.security.token.block.BlockTokenSecretManager.AccessMode; +import org.apache.hadoop.hdfs.security.token.block.DataEncryptionKey; +import org.apache.hadoop.hdfs.security.token.block.ExportedBlockKeys; +import org.apache.hadoop.hdfs.security.token.block.InvalidBlockTokenException; import org.apache.hadoop.hdfs.server.common.HdfsServerConstants; import org.apache.hadoop.hdfs.server.common.HdfsServerConstants.NodeType; import org.apache.hadoop.hdfs.server.common.HdfsServerConstants.ReplicaState; @@ -59,7 +144,11 @@ import org.apache.hadoop.hdfs.server.dat import org.apache.hadoop.hdfs.server.datanode.metrics.DataNodeMetrics; import org.apache.hadoop.hdfs.server.datanode.web.resources.DatanodeWebHdfsMethods; import org.apache.hadoop.hdfs.server.protocol.BlockRecoveryCommand.RecoveringBlock; -import org.apache.hadoop.hdfs.server.protocol.*; +import org.apache.hadoop.hdfs.server.protocol.DatanodeProtocol; +import org.apache.hadoop.hdfs.server.protocol.DatanodeRegistration; +import org.apache.hadoop.hdfs.server.protocol.InterDatanodeProtocol; +import org.apache.hadoop.hdfs.server.protocol.NamespaceInfo; +import org.apache.hadoop.hdfs.server.protocol.ReplicaRecoveryInfo; import org.apache.hadoop.hdfs.web.WebHdfsFileSystem; import org.apache.hadoop.hdfs.web.resources.Param; import org.apache.hadoop.http.HttpConfig; @@ -82,22 +171,21 @@ import org.apache.hadoop.security.UserGr import org.apache.hadoop.security.authorize.AccessControlList; import org.apache.hadoop.security.token.Token; import org.apache.hadoop.security.token.TokenIdentifier; -import org.apache.hadoop.util.*; +import org.apache.hadoop.util.Daemon; +import org.apache.hadoop.util.DiskChecker; import org.apache.hadoop.util.DiskChecker.DiskErrorException; +import org.apache.hadoop.util.GenericOptionsParser; +import org.apache.hadoop.util.JvmPauseMonitor; +import org.apache.hadoop.util.ServicePlugin; +import org.apache.hadoop.util.StringUtils; +import org.apache.hadoop.util.Time; +import org.apache.hadoop.util.VersionInfo; import org.mortbay.util.ajax.JSON; -import javax.management.ObjectName; - -import java.io.*; -import java.lang.management.ManagementFactory; -import java.net.*; -import java.nio.channels.SocketChannel; -import java.security.PrivilegedExceptionAction; -import java.util.*; -import java.util.concurrent.atomic.AtomicInteger; - -import static org.apache.hadoop.hdfs.DFSConfigKeys.*; -import static org.apache.hadoop.util.ExitUtil.terminate; +import com.google.common.annotations.VisibleForTesting; +import com.google.common.base.Joiner; +import com.google.common.base.Preconditions; +import com.google.protobuf.BlockingService; /********************************************************** * DataNode is a class (and program) that stores a set of @@ -224,6 +312,8 @@ public class DataNode extends Configured private final List<String> usersWithLocalPathAccess; private final boolean connectToDnViaHostname; ReadaheadPool readaheadPool; + SaslDataTransferClient saslClient; + SaslDataTransferServer saslServer; private final boolean getHdfsBlockLocationsEnabled; private ObjectName dataNodeInfoBeanName; private Thread checkDiskErrorThread = null; @@ -722,15 +812,10 @@ public class DataNode extends Configured */ void startDataNode(Configuration conf, List<StorageLocation> dataDirs, - // DatanodeProtocol namenode, SecureResources resources ) throws IOException { - if(UserGroupInformation.isSecurityEnabled() && resources == null) { - if (!conf.getBoolean("ignore.secure.ports.for.testing", false)) { - throw new RuntimeException("Cannot start secure cluster without " - + "privileged resources."); - } - } + + checkSecureConfig(conf, resources); // settings global for all BPs in the Data Node this.secureResources = resources; @@ -790,6 +875,55 @@ public class DataNode extends Configured // Create the ReadaheadPool from the DataNode context so we can // exit without having to explicitly shutdown its thread pool. readaheadPool = ReadaheadPool.getInstance(); + saslClient = new SaslDataTransferClient(dnConf.saslPropsResolver, + dnConf.trustedChannelResolver, + conf.getBoolean( + IPC_CLIENT_FALLBACK_TO_SIMPLE_AUTH_ALLOWED_KEY, + IPC_CLIENT_FALLBACK_TO_SIMPLE_AUTH_ALLOWED_DEFAULT)); + saslServer = new SaslDataTransferServer(dnConf, blockPoolTokenSecretManager); + } + + /** + * Checks if the DataNode has a secure configuration if security is enabled. + * There are 2 possible configurations that are considered secure: + * 1. The server has bound to privileged ports for RPC and HTTP via + * SecureDataNodeStarter. + * 2. The configuration enables SASL on DataTransferProtocol and HTTPS (no + * plain HTTP) for the HTTP server. The SASL handshake guarantees + * authentication of the RPC server before a client transmits a secret, such + * as a block access token. Similarly, SSL guarantees authentication of the + * HTTP server before a client transmits a secret, such as a delegation + * token. + * It is not possible to run with both privileged ports and SASL on + * DataTransferProtocol. For backwards-compatibility, the connection logic + * must check if the target port is a privileged port, and if so, skip the + * SASL handshake. + * + * @param conf Configuration to check + * @param resources SecuredResources obtained for DataNode + * @throws RuntimeException if security enabled, but configuration is insecure + */ + private static void checkSecureConfig(Configuration conf, + SecureResources resources) throws RuntimeException { + if (!UserGroupInformation.isSecurityEnabled()) { + return; + } + String dataTransferProtection = conf.get(DFS_DATA_TRANSFER_PROTECTION_KEY); + if (resources != null && dataTransferProtection == null) { + return; + } + if (conf.getBoolean("ignore.secure.ports.for.testing", false)) { + return; + } + if (dataTransferProtection != null && + DFSUtil.getHttpPolicy(conf) == HttpConfig.Policy.HTTPS_ONLY && + resources == null) { + return; + } + throw new RuntimeException("Cannot start secure DataNode without " + + "configuring either privileged resources or SASL RPC data transfer " + + "protection and SSL for HTTP. Using privileged resources in " + + "combination with SASL RPC data transfer protection is not supported."); } public static String generateUuid() { @@ -1423,8 +1557,8 @@ public class DataNode extends Configured return xmitsInProgress.get(); } - private void transferBlock(ExtendedBlock block, DatanodeInfo xferTargets[]) - throws IOException { + private void transferBlock(ExtendedBlock block, DatanodeInfo[] xferTargets, + StorageType[] xferTargetStorageTypes) throws IOException { BPOfferService bpos = getBPOSForBlock(block); DatanodeRegistration bpReg = getDNRegistrationForBP(block.getBlockPoolId()); @@ -1460,16 +1594,17 @@ public class DataNode extends Configured LOG.info(bpReg + " Starting thread to transfer " + block + " to " + xfersBuilder); - new Daemon(new DataTransfer(xferTargets, block, + new Daemon(new DataTransfer(xferTargets, xferTargetStorageTypes, block, BlockConstructionStage.PIPELINE_SETUP_CREATE, "")).start(); } } void transferBlocks(String poolId, Block blocks[], - DatanodeInfo xferTargets[][]) { + DatanodeInfo xferTargets[][], StorageType[][] xferTargetStorageTypes) { for (int i = 0; i < blocks.length; i++) { try { - transferBlock(new ExtendedBlock(poolId, blocks[i]), xferTargets[i]); + transferBlock(new ExtendedBlock(poolId, blocks[i]), xferTargets[i], + xferTargetStorageTypes[i]); } catch (IOException ie) { LOG.warn("Failed to transfer block " + blocks[i], ie); } @@ -1572,6 +1707,7 @@ public class DataNode extends Configured */ private class DataTransfer implements Runnable { final DatanodeInfo[] targets; + final StorageType[] targetStorageTypes; final ExtendedBlock b; final BlockConstructionStage stage; final private DatanodeRegistration bpReg; @@ -1582,7 +1718,8 @@ public class DataNode extends Configured * Connect to the first item in the target list. Pass along the * entire target list, the block, and the data. */ - DataTransfer(DatanodeInfo targets[], ExtendedBlock b, BlockConstructionStage stage, + DataTransfer(DatanodeInfo targets[], StorageType[] targetStorageTypes, + ExtendedBlock b, BlockConstructionStage stage, final String clientname) { if (DataTransferProtocol.LOG.isDebugEnabled()) { DataTransferProtocol.LOG.debug(getClass().getSimpleName() + ": " @@ -1592,6 +1729,7 @@ public class DataNode extends Configured + ", targests=" + Arrays.asList(targets)); } this.targets = targets; + this.targetStorageTypes = targetStorageTypes; this.b = b; this.stage = stage; BPOfferService bpos = blockPoolManager.get(b.getBlockPoolId()); @@ -1623,20 +1761,25 @@ public class DataNode extends Configured NetUtils.connect(sock, curTarget, dnConf.socketTimeout); sock.setSoTimeout(targets.length * dnConf.socketTimeout); + // + // Header info + // + Token<BlockTokenIdentifier> accessToken = BlockTokenSecretManager.DUMMY_TOKEN; + if (isBlockTokenEnabled) { + accessToken = blockPoolTokenSecretManager.generateToken(b, + EnumSet.of(BlockTokenSecretManager.AccessMode.WRITE)); + } + long writeTimeout = dnConf.socketWriteTimeout + HdfsServerConstants.WRITE_TIMEOUT_EXTENSION * (targets.length-1); OutputStream unbufOut = NetUtils.getOutputStream(sock, writeTimeout); InputStream unbufIn = NetUtils.getInputStream(sock); - if (dnConf.encryptDataTransfer && - !dnConf.trustedChannelResolver.isTrusted(sock.getInetAddress())) { - IOStreamPair encryptedStreams = - DataTransferEncryptor.getEncryptedStreams( - unbufOut, unbufIn, - blockPoolTokenSecretManager.generateDataEncryptionKey( - b.getBlockPoolId())); - unbufOut = encryptedStreams.out; - unbufIn = encryptedStreams.in; - } + DataEncryptionKeyFactory keyFactory = + getDataEncryptionKeyFactoryForBlock(b); + IOStreamPair saslStreams = saslClient.socketSend(sock, unbufOut, + unbufIn, keyFactory, accessToken, bpReg); + unbufOut = saslStreams.out; + unbufIn = saslStreams.in; out = new DataOutputStream(new BufferedOutputStream(unbufOut, HdfsConstants.SMALL_BUFFER_SIZE)); @@ -1645,16 +1788,8 @@ public class DataNode extends Configured false, false, true, DataNode.this, null, cachingStrategy); DatanodeInfo srcNode = new DatanodeInfo(bpReg); - // - // Header info - // - Token<BlockTokenIdentifier> accessToken = BlockTokenSecretManager.DUMMY_TOKEN; - if (isBlockTokenEnabled) { - accessToken = blockPoolTokenSecretManager.generateToken(b, - EnumSet.of(BlockTokenSecretManager.AccessMode.WRITE)); - } - - new Sender(out).writeBlock(b, accessToken, clientname, targets, srcNode, + new Sender(out).writeBlock(b, targetStorageTypes[0], accessToken, + clientname, targets, targetStorageTypes, srcNode, stage, 0, 0, 0, 0, blockSender.getChecksum(), cachingStrategy); // send data & checksum @@ -1696,7 +1831,26 @@ public class DataNode extends Configured } } } - + + /** + * Returns a new DataEncryptionKeyFactory that generates a key from the + * BlockPoolTokenSecretManager, using the block pool ID of the given block. + * + * @param block for which the factory needs to create a key + * @return DataEncryptionKeyFactory for block's block pool ID + */ + DataEncryptionKeyFactory getDataEncryptionKeyFactoryForBlock( + final ExtendedBlock block) { + return new DataEncryptionKeyFactory() { + @Override + public DataEncryptionKey newDataEncryptionKey() { + return dnConf.encryptDataTransfer ? + blockPoolTokenSecretManager.generateDataEncryptionKey( + block.getBlockPoolId()) : null; + } + }; + } + /** * After a block becomes finalized, a datanode increases metric counter, * notifies namenode, and adds it to the block scanner @@ -2336,7 +2490,8 @@ public class DataNode extends Configured * @param client client name */ void transferReplicaForPipelineRecovery(final ExtendedBlock b, - final DatanodeInfo[] targets, final String client) throws IOException { + final DatanodeInfo[] targets, final StorageType[] targetStorageTypes, + final String client) throws IOException { final long storedGS; final long visible; final BlockConstructionStage stage; @@ -2369,7 +2524,7 @@ public class DataNode extends Configured b.setNumBytes(visible); if (targets.length > 0) { - new DataTransfer(targets, b, stage, client).run(); + new DataTransfer(targets, targetStorageTypes, b, stage, client).run(); } }
Modified: hadoop/common/branches/YARN-1051/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataXceiver.java URL: http://svn.apache.org/viewvc/hadoop/common/branches/YARN-1051/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataXceiver.java?rev=1613514&r1=1613513&r2=1613514&view=diff ============================================================================== --- hadoop/common/branches/YARN-1051/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataXceiver.java (original) +++ hadoop/common/branches/YARN-1051/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataXceiver.java Fri Jul 25 20:33:09 2014 @@ -36,29 +36,27 @@ import java.io.IOException; import java.io.InputStream; import java.io.InterruptedIOException; import java.io.OutputStream; -import java.net.InetAddress; import java.net.InetSocketAddress; import java.net.Socket; import java.net.SocketException; -import java.net.UnknownHostException; import java.nio.channels.ClosedChannelException; import java.security.MessageDigest; import java.util.Arrays; import org.apache.commons.logging.Log; import org.apache.hadoop.hdfs.ExtendedBlockId; +import org.apache.hadoop.hdfs.StorageType; import org.apache.hadoop.hdfs.net.Peer; import org.apache.hadoop.hdfs.protocol.DatanodeInfo; import org.apache.hadoop.hdfs.protocol.ExtendedBlock; import org.apache.hadoop.hdfs.protocol.HdfsConstants; import org.apache.hadoop.hdfs.protocol.datatransfer.BlockConstructionStage; -import org.apache.hadoop.hdfs.protocol.datatransfer.DataTransferEncryptor; -import org.apache.hadoop.hdfs.protocol.datatransfer.DataTransferEncryptor.InvalidMagicNumberException; import org.apache.hadoop.hdfs.protocol.datatransfer.DataTransferProtoUtil; import org.apache.hadoop.hdfs.protocol.datatransfer.IOStreamPair; import org.apache.hadoop.hdfs.protocol.datatransfer.Op; import org.apache.hadoop.hdfs.protocol.datatransfer.Receiver; import org.apache.hadoop.hdfs.protocol.datatransfer.Sender; +import org.apache.hadoop.hdfs.protocol.datatransfer.sasl.DataEncryptionKeyFactory; import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.BlockOpResponseProto; import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.ClientReadStatusProto; import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.OpBlockChecksumResponseProto; @@ -85,7 +83,6 @@ import org.apache.hadoop.security.token. import org.apache.hadoop.util.DataChecksum; import com.google.common.base.Preconditions; -import com.google.common.net.InetAddresses; import com.google.protobuf.ByteString; @@ -174,24 +171,11 @@ class DataXceiver extends Receiver imple dataXceiverServer.addPeer(peer, Thread.currentThread()); peer.setWriteTimeout(datanode.getDnConf().socketWriteTimeout); InputStream input = socketIn; - if ((!peer.hasSecureChannel()) && dnConf.encryptDataTransfer && - !dnConf.trustedChannelResolver.isTrusted(getClientAddress(peer))){ - IOStreamPair encryptedStreams = null; - try { - encryptedStreams = DataTransferEncryptor.getEncryptedStreams(socketOut, - socketIn, datanode.blockPoolTokenSecretManager, - dnConf.encryptionAlgorithm); - } catch (InvalidMagicNumberException imne) { - LOG.info("Failed to read expected encryption handshake from client " + - "at " + peer.getRemoteAddressString() + ". Perhaps the client " + - "is running an older version of Hadoop which does not support " + - "encryption"); - return; - } - input = encryptedStreams.in; - socketOut = encryptedStreams.out; - } - input = new BufferedInputStream(input, HdfsConstants.SMALL_BUFFER_SIZE); + IOStreamPair saslStreams = datanode.saslServer.receive(peer, socketOut, + socketIn, datanode.getDatanodeId()); + input = new BufferedInputStream(saslStreams.in, + HdfsConstants.SMALL_BUFFER_SIZE); + socketOut = saslStreams.out; super.initialize(new DataInputStream(input)); @@ -263,19 +247,6 @@ class DataXceiver extends Receiver imple } } } - - /** - * Returns InetAddress from peer - * The getRemoteAddressString is the form /ip-address:port - * The ip-address is extracted from peer and InetAddress is formed - * @param peer - * @return - * @throws UnknownHostException - */ - private static InetAddress getClientAddress(Peer peer) { - return InetAddresses.forString( - peer.getRemoteAddressString().split(":")[0].substring(1)); - } @Override public void requestShortCircuitFds(final ExtendedBlock blk, @@ -554,9 +525,11 @@ class DataXceiver extends Receiver imple @Override public void writeBlock(final ExtendedBlock block, + final StorageType storageType, final Token<BlockTokenIdentifier> blockToken, final String clientname, final DatanodeInfo[] targets, + final StorageType[] targetStorageTypes, final DatanodeInfo srcDataNode, final BlockConstructionStage stage, final int pipelineSize, @@ -620,12 +593,13 @@ class DataXceiver extends Receiver imple if (isDatanode || stage != BlockConstructionStage.PIPELINE_CLOSE_RECOVERY) { // open a block receiver - blockReceiver = new BlockReceiver(block, in, + blockReceiver = new BlockReceiver(block, storageType, in, peer.getRemoteAddressString(), peer.getLocalAddressString(), stage, latestGenerationStamp, minBytesRcvd, maxBytesRcvd, clientname, srcDataNode, datanode, requestedChecksum, cachingStrategy); + storageUuid = blockReceiver.getStorageUuid(); } else { storageUuid = datanode.data.recoverClose( @@ -656,25 +630,20 @@ class DataXceiver extends Receiver imple OutputStream unbufMirrorOut = NetUtils.getOutputStream(mirrorSock, writeTimeout); InputStream unbufMirrorIn = NetUtils.getInputStream(mirrorSock); - if (dnConf.encryptDataTransfer && - !dnConf.trustedChannelResolver.isTrusted(mirrorSock.getInetAddress())) { - IOStreamPair encryptedStreams = - DataTransferEncryptor.getEncryptedStreams( - unbufMirrorOut, unbufMirrorIn, - datanode.blockPoolTokenSecretManager - .generateDataEncryptionKey(block.getBlockPoolId())); - - unbufMirrorOut = encryptedStreams.out; - unbufMirrorIn = encryptedStreams.in; - } + DataEncryptionKeyFactory keyFactory = + datanode.getDataEncryptionKeyFactoryForBlock(block); + IOStreamPair saslStreams = datanode.saslClient.socketSend(mirrorSock, + unbufMirrorOut, unbufMirrorIn, keyFactory, blockToken, targets[0]); + unbufMirrorOut = saslStreams.out; + unbufMirrorIn = saslStreams.in; mirrorOut = new DataOutputStream(new BufferedOutputStream(unbufMirrorOut, HdfsConstants.SMALL_BUFFER_SIZE)); mirrorIn = new DataInputStream(unbufMirrorIn); - new Sender(mirrorOut).writeBlock(originalBlock, blockToken, - clientname, targets, srcDataNode, stage, pipelineSize, - minBytesRcvd, maxBytesRcvd, latestGenerationStamp, requestedChecksum, - cachingStrategy); + new Sender(mirrorOut).writeBlock(originalBlock, targetStorageTypes[0], + blockToken, clientname, targets, targetStorageTypes, srcDataNode, + stage, pipelineSize, minBytesRcvd, maxBytesRcvd, + latestGenerationStamp, requestedChecksum, cachingStrategy); mirrorOut.flush(); @@ -789,7 +758,8 @@ class DataXceiver extends Receiver imple public void transferBlock(final ExtendedBlock blk, final Token<BlockTokenIdentifier> blockToken, final String clientName, - final DatanodeInfo[] targets) throws IOException { + final DatanodeInfo[] targets, + final StorageType[] targetStorageTypes) throws IOException { checkAccess(socketOut, true, blk, blockToken, Op.TRANSFER_BLOCK, BlockTokenSecretManager.AccessMode.COPY); previousOpClientName = clientName; @@ -798,7 +768,8 @@ class DataXceiver extends Receiver imple final DataOutputStream out = new DataOutputStream( getOutputStream()); try { - datanode.transferReplicaForPipelineRecovery(blk, targets, clientName); + datanode.transferReplicaForPipelineRecovery(blk, targets, + targetStorageTypes, clientName); writeResponse(Status.SUCCESS, null, out); } finally { IOUtils.closeStream(out); @@ -976,6 +947,7 @@ class DataXceiver extends Receiver imple @Override public void replaceBlock(final ExtendedBlock block, + final StorageType storageType, final Token<BlockTokenIdentifier> blockToken, final String delHint, final DatanodeInfo proxySource) throws IOException { @@ -1026,17 +998,12 @@ class DataXceiver extends Receiver imple OutputStream unbufProxyOut = NetUtils.getOutputStream(proxySock, dnConf.socketWriteTimeout); InputStream unbufProxyIn = NetUtils.getInputStream(proxySock); - if (dnConf.encryptDataTransfer && - !dnConf.trustedChannelResolver.isTrusted( - proxySock.getInetAddress())) { - IOStreamPair encryptedStreams = - DataTransferEncryptor.getEncryptedStreams( - unbufProxyOut, unbufProxyIn, - datanode.blockPoolTokenSecretManager - .generateDataEncryptionKey(block.getBlockPoolId())); - unbufProxyOut = encryptedStreams.out; - unbufProxyIn = encryptedStreams.in; - } + DataEncryptionKeyFactory keyFactory = + datanode.getDataEncryptionKeyFactoryForBlock(block); + IOStreamPair saslStreams = datanode.saslClient.socketSend(proxySock, + unbufProxyOut, unbufProxyIn, keyFactory, blockToken, proxySource); + unbufProxyOut = saslStreams.out; + unbufProxyIn = saslStreams.in; proxyOut = new DataOutputStream(new BufferedOutputStream(unbufProxyOut, HdfsConstants.SMALL_BUFFER_SIZE)); @@ -1066,8 +1033,8 @@ class DataXceiver extends Receiver imple DataChecksum remoteChecksum = DataTransferProtoUtil.fromProto( checksumInfo.getChecksum()); // open a block receiver and check if the block does not exist - blockReceiver = new BlockReceiver( - block, proxyReply, proxySock.getRemoteSocketAddress().toString(), + blockReceiver = new BlockReceiver(block, storageType, + proxyReply, proxySock.getRemoteSocketAddress().toString(), proxySock.getLocalSocketAddress().toString(), null, 0, 0, 0, "", null, datanode, remoteChecksum, CachingStrategy.newDropBehind()); Modified: hadoop/common/branches/YARN-1051/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/FsDatasetSpi.java URL: http://svn.apache.org/viewvc/hadoop/common/branches/YARN-1051/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/FsDatasetSpi.java?rev=1613514&r1=1613513&r2=1613514&view=diff ============================================================================== --- hadoop/common/branches/YARN-1051/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/FsDatasetSpi.java (original) +++ hadoop/common/branches/YARN-1051/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/FsDatasetSpi.java Fri Jul 25 20:33:09 2014 @@ -28,6 +28,7 @@ import java.util.Map; import org.apache.hadoop.classification.InterfaceAudience; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.hdfs.DFSConfigKeys; +import org.apache.hadoop.hdfs.StorageType; import org.apache.hadoop.hdfs.protocol.Block; import org.apache.hadoop.hdfs.protocol.BlockListAsLongs; import org.apache.hadoop.hdfs.protocol.BlockLocalPathInfo; @@ -176,8 +177,8 @@ public interface FsDatasetSpi<V extends * @return the meta info of the replica which is being written to * @throws IOException if an error occurs */ - public ReplicaInPipelineInterface createTemporary(ExtendedBlock b - ) throws IOException; + public ReplicaInPipelineInterface createTemporary(StorageType storageType, + ExtendedBlock b) throws IOException; /** * Creates a RBW replica and returns the meta info of the replica @@ -186,8 +187,8 @@ public interface FsDatasetSpi<V extends * @return the meta info of the replica which is being written to * @throws IOException if an error occurs */ - public ReplicaInPipelineInterface createRbw(ExtendedBlock b - ) throws IOException; + public ReplicaInPipelineInterface createRbw(StorageType storageType, + ExtendedBlock b) throws IOException; /** * Recovers a RBW replica and returns the meta info of the replica Modified: hadoop/common/branches/YARN-1051/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsDatasetImpl.java URL: http://svn.apache.org/viewvc/hadoop/common/branches/YARN-1051/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsDatasetImpl.java?rev=1613514&r1=1613513&r2=1613514&view=diff ============================================================================== --- hadoop/common/branches/YARN-1051/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsDatasetImpl.java (original) +++ hadoop/common/branches/YARN-1051/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsDatasetImpl.java Fri Jul 25 20:33:09 2014 @@ -17,6 +17,28 @@ */ package org.apache.hadoop.hdfs.server.datanode.fsdataset.impl; +import java.io.File; +import java.io.FileDescriptor; +import java.io.FileInputStream; +import java.io.FileNotFoundException; +import java.io.FileOutputStream; +import java.io.IOException; +import java.io.InputStream; +import java.io.RandomAccessFile; +import java.nio.ByteBuffer; +import java.nio.channels.FileChannel; +import java.util.ArrayList; +import java.util.Collection; +import java.util.HashMap; +import java.util.Iterator; +import java.util.List; +import java.util.Map; +import java.util.concurrent.Executor; + +import javax.management.NotCompliantMBeanException; +import javax.management.ObjectName; +import javax.management.StandardMBean; + import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.apache.hadoop.classification.InterfaceAudience; @@ -24,12 +46,37 @@ import org.apache.hadoop.conf.Configurat import org.apache.hadoop.fs.Path; import org.apache.hadoop.hdfs.DFSConfigKeys; import org.apache.hadoop.hdfs.StorageType; -import org.apache.hadoop.hdfs.protocol.*; +import org.apache.hadoop.hdfs.protocol.Block; +import org.apache.hadoop.hdfs.protocol.BlockListAsLongs; +import org.apache.hadoop.hdfs.protocol.BlockLocalPathInfo; +import org.apache.hadoop.hdfs.protocol.ExtendedBlock; +import org.apache.hadoop.hdfs.protocol.HdfsBlocksMetadata; +import org.apache.hadoop.hdfs.protocol.RecoveryInProgressException; import org.apache.hadoop.hdfs.server.common.GenerationStamp; import org.apache.hadoop.hdfs.server.common.HdfsServerConstants.ReplicaState; import org.apache.hadoop.hdfs.server.common.Storage; -import org.apache.hadoop.hdfs.server.datanode.*; -import org.apache.hadoop.hdfs.server.datanode.fsdataset.*; +import org.apache.hadoop.hdfs.server.datanode.BlockMetadataHeader; +import org.apache.hadoop.hdfs.server.datanode.DataBlockScanner; +import org.apache.hadoop.hdfs.server.datanode.DataNode; +import org.apache.hadoop.hdfs.server.datanode.DataStorage; +import org.apache.hadoop.hdfs.server.datanode.FinalizedReplica; +import org.apache.hadoop.hdfs.server.datanode.Replica; +import org.apache.hadoop.hdfs.server.datanode.ReplicaAlreadyExistsException; +import org.apache.hadoop.hdfs.server.datanode.ReplicaBeingWritten; +import org.apache.hadoop.hdfs.server.datanode.ReplicaInPipeline; +import org.apache.hadoop.hdfs.server.datanode.ReplicaInfo; +import org.apache.hadoop.hdfs.server.datanode.ReplicaNotFoundException; +import org.apache.hadoop.hdfs.server.datanode.ReplicaUnderRecovery; +import org.apache.hadoop.hdfs.server.datanode.ReplicaWaitingToBeRecovered; +import org.apache.hadoop.hdfs.server.datanode.StorageLocation; +import org.apache.hadoop.hdfs.server.datanode.fsdataset.FsDatasetSpi; +import org.apache.hadoop.hdfs.server.datanode.fsdataset.FsVolumeSpi; +import org.apache.hadoop.hdfs.server.datanode.fsdataset.LengthInputStream; +import org.apache.hadoop.hdfs.server.datanode.fsdataset.ReplicaInputStreams; +import org.apache.hadoop.hdfs.server.datanode.fsdataset.ReplicaOutputStreams; +import org.apache.hadoop.hdfs.server.datanode.fsdataset.RollingLogs; +import org.apache.hadoop.hdfs.server.datanode.fsdataset.RoundRobinVolumeChoosingPolicy; +import org.apache.hadoop.hdfs.server.datanode.fsdataset.VolumeChoosingPolicy; import org.apache.hadoop.hdfs.server.datanode.metrics.FSDatasetMBean; import org.apache.hadoop.hdfs.server.protocol.BlockRecoveryCommand.RecoveringBlock; import org.apache.hadoop.hdfs.server.protocol.DatanodeStorage; @@ -43,15 +90,6 @@ import org.apache.hadoop.util.DiskChecke import org.apache.hadoop.util.ReflectionUtils; import org.apache.hadoop.util.Time; -import javax.management.NotCompliantMBeanException; -import javax.management.ObjectName; -import javax.management.StandardMBean; -import java.io.*; -import java.nio.ByteBuffer; -import java.nio.channels.FileChannel; -import java.util.*; -import java.util.concurrent.Executor; - /************************************************** * FSDataset manages a set of data blocks. Each block * has a unique name and an extent on disk. @@ -736,8 +774,8 @@ class FsDatasetImpl implements FsDataset } @Override // FsDatasetSpi - public synchronized ReplicaInPipeline createRbw(ExtendedBlock b) - throws IOException { + public synchronized ReplicaInPipeline createRbw(StorageType storageType, + ExtendedBlock b) throws IOException { ReplicaInfo replicaInfo = volumeMap.get(b.getBlockPoolId(), b.getBlockId()); if (replicaInfo != null) { @@ -746,7 +784,7 @@ class FsDatasetImpl implements FsDataset " and thus cannot be created."); } // create a new block - FsVolumeImpl v = volumes.getNextVolume(b.getNumBytes()); + FsVolumeImpl v = volumes.getNextVolume(storageType, b.getNumBytes()); // create a rbw file to hold block in the designated volume File f = v.createRbwFile(b.getBlockPoolId(), b.getLocalBlock()); ReplicaBeingWritten newReplicaInfo = new ReplicaBeingWritten(b.getBlockId(), @@ -874,8 +912,8 @@ class FsDatasetImpl implements FsDataset } @Override // FsDatasetSpi - public synchronized ReplicaInPipeline createTemporary(ExtendedBlock b) - throws IOException { + public synchronized ReplicaInPipeline createTemporary(StorageType storageType, + ExtendedBlock b) throws IOException { ReplicaInfo replicaInfo = volumeMap.get(b.getBlockPoolId(), b.getBlockId()); if (replicaInfo != null) { throw new ReplicaAlreadyExistsException("Block " + b + @@ -883,7 +921,7 @@ class FsDatasetImpl implements FsDataset " and thus cannot be created."); } - FsVolumeImpl v = volumes.getNextVolume(b.getNumBytes()); + FsVolumeImpl v = volumes.getNextVolume(storageType, b.getNumBytes()); // create a temporary file to hold block in the designated volume File f = v.createTmpFile(b.getBlockPoolId(), b.getLocalBlock()); ReplicaInPipeline newReplicaInfo = new ReplicaInPipeline(b.getBlockId(), Modified: hadoop/common/branches/YARN-1051/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsVolumeList.java URL: http://svn.apache.org/viewvc/hadoop/common/branches/YARN-1051/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsVolumeList.java?rev=1613514&r1=1613513&r2=1613514&view=diff ============================================================================== --- hadoop/common/branches/YARN-1051/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsVolumeList.java (original) +++ hadoop/common/branches/YARN-1051/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsVolumeList.java Fri Jul 25 20:33:09 2014 @@ -18,13 +18,17 @@ package org.apache.hadoop.hdfs.server.datanode.fsdataset.impl; import java.io.IOException; -import java.util.*; +import java.util.ArrayList; +import java.util.Collections; +import java.util.Iterator; +import java.util.List; import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hdfs.StorageType; import org.apache.hadoop.hdfs.server.datanode.fsdataset.FsVolumeSpi; import org.apache.hadoop.hdfs.server.datanode.fsdataset.VolumeChoosingPolicy; -import org.apache.hadoop.util.Time; import org.apache.hadoop.util.DiskChecker.DiskErrorException; +import org.apache.hadoop.util.Time; class FsVolumeList { /** @@ -52,11 +56,18 @@ class FsVolumeList { * by a single thread and next volume is chosen with no concurrent * update to {@link #volumes}. * @param blockSize free space needed on the volume + * @param storageType the desired {@link StorageType} * @return next volume to store the block in. */ - // TODO should choose volume with storage type - synchronized FsVolumeImpl getNextVolume(long blockSize) throws IOException { - return blockChooser.chooseVolume(volumes, blockSize); + synchronized FsVolumeImpl getNextVolume(StorageType storageType, + long blockSize) throws IOException { + final List<FsVolumeImpl> list = new ArrayList<FsVolumeImpl>(volumes.size()); + for(FsVolumeImpl v : volumes) { + if (v.getStorageType() == storageType) { + list.add(v); + } + } + return blockChooser.chooseVolume(list, blockSize); } long getDfsUsed() throws IOException { Modified: hadoop/common/branches/YARN-1051/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/web/resources/DatanodeWebHdfsMethods.java URL: http://svn.apache.org/viewvc/hadoop/common/branches/YARN-1051/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/web/resources/DatanodeWebHdfsMethods.java?rev=1613514&r1=1613513&r2=1613514&view=diff ============================================================================== --- hadoop/common/branches/YARN-1051/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/web/resources/DatanodeWebHdfsMethods.java (original) +++ hadoop/common/branches/YARN-1051/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/web/resources/DatanodeWebHdfsMethods.java Fri Jul 25 20:33:09 2014 @@ -128,7 +128,8 @@ public class DatanodeWebHdfsMethods { "://" + nnId); boolean isLogical = HAUtil.isLogicalUri(conf, nnUri); if (isLogical) { - token.setService(HAUtil.buildTokenServiceForLogicalUri(nnUri)); + token.setService(HAUtil.buildTokenServiceForLogicalUri(nnUri, + HdfsConstants.HDFS_URI_SCHEME)); } else { token.setService(SecurityUtil.buildTokenService(nnUri)); } Modified: hadoop/common/branches/YARN-1051/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSClusterStats.java URL: http://svn.apache.org/viewvc/hadoop/common/branches/YARN-1051/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSClusterStats.java?rev=1613514&r1=1613513&r2=1613514&view=diff ============================================================================== --- hadoop/common/branches/YARN-1051/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSClusterStats.java (original) +++ hadoop/common/branches/YARN-1051/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSClusterStats.java Fri Jul 25 20:33:09 2014 @@ -48,6 +48,15 @@ public interface FSClusterStats { * @return Number of datanodes that are both alive and not decommissioned. */ public int getNumDatanodesInService(); + + /** + * an indication of the average load of non-decommission(ing|ed) nodes + * eligible for block placement + * + * @return average of the in service number of block transfers and block + * writes that are currently occurring on the cluster. + */ + public double getInServiceXceiverAverage(); } Modified: hadoop/common/branches/YARN-1051/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSEditLog.java URL: http://svn.apache.org/viewvc/hadoop/common/branches/YARN-1051/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSEditLog.java?rev=1613514&r1=1613513&r2=1613514&view=diff ============================================================================== --- hadoop/common/branches/YARN-1051/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSEditLog.java (original) +++ hadoop/common/branches/YARN-1051/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSEditLog.java Fri Jul 25 20:33:09 2014 @@ -1074,10 +1074,11 @@ public class FSEditLog implements LogsPu logEdit(op); } - void logRemoveXAttrs(String src, List<XAttr> xAttrs) { + void logRemoveXAttrs(String src, List<XAttr> xAttrs, boolean toLogRpcIds) { final RemoveXAttrOp op = RemoveXAttrOp.getInstance(); op.src = src; op.xAttrs = xAttrs; + logRpcIds(op, toLogRpcIds); logEdit(op); } Modified: hadoop/common/branches/YARN-1051/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSEditLogLoader.java URL: http://svn.apache.org/viewvc/hadoop/common/branches/YARN-1051/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSEditLogLoader.java?rev=1613514&r1=1613513&r2=1613514&view=diff ============================================================================== --- hadoop/common/branches/YARN-1051/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSEditLogLoader.java (original) +++ hadoop/common/branches/YARN-1051/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSEditLogLoader.java Fri Jul 25 20:33:09 2014 @@ -821,6 +821,10 @@ public class FSEditLogLoader { RemoveXAttrOp removeXAttrOp = (RemoveXAttrOp) op; fsDir.unprotectedRemoveXAttrs(removeXAttrOp.src, removeXAttrOp.xAttrs); + if (toAddRetryCache) { + fsNamesys.addCacheEntry(removeXAttrOp.rpcClientId, + removeXAttrOp.rpcCallId); + } break; } default: Modified: hadoop/common/branches/YARN-1051/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSEditLogOp.java URL: http://svn.apache.org/viewvc/hadoop/common/branches/YARN-1051/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSEditLogOp.java?rev=1613514&r1=1613513&r2=1613514&view=diff ============================================================================== --- hadoop/common/branches/YARN-1051/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSEditLogOp.java (original) +++ hadoop/common/branches/YARN-1051/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSEditLogOp.java Fri Jul 25 20:33:09 2014 @@ -3551,6 +3551,7 @@ public abstract class FSEditLogOp { XAttrEditLogProto p = XAttrEditLogProto.parseDelimitedFrom(in); src = p.getSrc(); xAttrs = PBHelper.convertXAttrs(p.getXAttrsList()); + readRpcIds(in, logVersion); } @Override @@ -3561,18 +3562,22 @@ public abstract class FSEditLogOp { } b.addAllXAttrs(PBHelper.convertXAttrProto(xAttrs)); b.build().writeDelimitedTo(out); + // clientId and callId + writeRpcIds(rpcClientId, rpcCallId, out); } @Override protected void toXml(ContentHandler contentHandler) throws SAXException { XMLUtils.addSaxString(contentHandler, "SRC", src); appendXAttrsToXml(contentHandler, xAttrs); + appendRpcIdsToXml(contentHandler, rpcClientId, rpcCallId); } @Override void fromXml(Stanza st) throws InvalidXmlException { src = st.getValue("SRC"); xAttrs = readXAttrsFromXml(st); + readRpcIdsFromXml(st); } } Modified: hadoop/common/branches/YARN-1051/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSImage.java URL: http://svn.apache.org/viewvc/hadoop/common/branches/YARN-1051/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSImage.java?rev=1613514&r1=1613513&r2=1613514&view=diff ============================================================================== --- hadoop/common/branches/YARN-1051/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSImage.java (original) +++ hadoop/common/branches/YARN-1051/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSImage.java Fri Jul 25 20:33:09 2014 @@ -225,6 +225,7 @@ public class FSImage implements Closeabl NNStorage.checkVersionUpgradable(storage.getLayoutVersion()); } if (startOpt != StartupOption.UPGRADE + && startOpt != StartupOption.UPGRADEONLY && !RollingUpgradeStartupOption.STARTED.matches(startOpt) && layoutVersion < Storage.LAST_PRE_UPGRADE_LAYOUT_VERSION && layoutVersion != HdfsConstants.NAMENODE_LAYOUT_VERSION) { @@ -263,6 +264,7 @@ public class FSImage implements Closeabl // 3. Do transitions switch(startOpt) { case UPGRADE: + case UPGRADEONLY: doUpgrade(target); return false; // upgrade saved image already case IMPORT: @@ -748,11 +750,13 @@ public class FSImage implements Closeabl editLog.recoverUnclosedStreams(); } else if (HAUtil.isHAEnabled(conf, nameserviceId) && (startOpt == StartupOption.UPGRADE + || startOpt == StartupOption.UPGRADEONLY || RollingUpgradeStartupOption.ROLLBACK.matches(startOpt))) { // This NN is HA, but we're doing an upgrade or a rollback of rolling // upgrade so init the edit log for write. editLog.initJournalsForWrite(); - if (startOpt == StartupOption.UPGRADE) { + if (startOpt == StartupOption.UPGRADE + || startOpt == StartupOption.UPGRADEONLY) { long sharedLogCTime = editLog.getSharedLogCTime(); if (this.storage.getCTime() < sharedLogCTime) { throw new IOException("It looks like the shared log is already " + Modified: hadoop/common/branches/YARN-1051/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSImageFormat.java URL: http://svn.apache.org/viewvc/hadoop/common/branches/YARN-1051/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSImageFormat.java?rev=1613514&r1=1613513&r2=1613514&view=diff ============================================================================== --- hadoop/common/branches/YARN-1051/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSImageFormat.java (original) +++ hadoop/common/branches/YARN-1051/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSImageFormat.java Fri Jul 25 20:33:09 2014 @@ -614,6 +614,16 @@ public class FSImageFormat { INodeDirectory parentINode = fsDir.rootDir; for (long i = 0; i < numFiles; i++) { pathComponents = FSImageSerialization.readPathComponents(in); + for (int j=0; j < pathComponents.length; j++) { + byte[] newComponent = renameReservedComponentOnUpgrade + (pathComponents[j], getLayoutVersion()); + if (!Arrays.equals(newComponent, pathComponents[j])) { + String oldPath = DFSUtil.byteArray2PathString(pathComponents); + pathComponents[j] = newComponent; + String newPath = DFSUtil.byteArray2PathString(pathComponents); + LOG.info("Renaming reserved path " + oldPath + " to " + newPath); + } + } final INode newNode = loadINode( pathComponents[pathComponents.length-1], false, in, counter); @@ -926,6 +936,7 @@ public class FSImageFormat { oldnode = namesystem.dir.getInode(cons.getId()).asFile(); inSnapshot = true; } else { + path = renameReservedPathsOnUpgrade(path, getLayoutVersion()); final INodesInPath iip = fsDir.getLastINodeInPath(path); oldnode = INodeFile.valueOf(iip.getINode(0), path); } Modified: hadoop/common/branches/YARN-1051/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSNamesystem.java URL: http://svn.apache.org/viewvc/hadoop/common/branches/YARN-1051/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSNamesystem.java?rev=1613514&r1=1613513&r2=1613514&view=diff ============================================================================== --- hadoop/common/branches/YARN-1051/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSNamesystem.java (original) +++ hadoop/common/branches/YARN-1051/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSNamesystem.java Fri Jul 25 20:33:09 2014 @@ -83,6 +83,9 @@ import static org.apache.hadoop.hdfs.DFS import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_PERMISSIONS_SUPERUSERGROUP_KEY; import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_REPLICATION_DEFAULT; import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_REPLICATION_KEY; +import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_NAMENODE_RANDOMIZE_BLOCK_LOCATIONS_PER_BLOCK; +import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_NAMENODE_RANDOMIZE_BLOCK_LOCATIONS_PER_BLOCK_DEFAULT; + import static org.apache.hadoop.util.Time.now; import java.io.BufferedWriter; @@ -528,6 +531,8 @@ public class FSNamesystem implements Nam private final FSImage fsImage; + private boolean randomizeBlockLocationsPerBlock; + /** * Notify that loading of this FSDirectory is complete, and * it is imageLoaded for use @@ -837,6 +842,10 @@ public class FSNamesystem implements Nam alwaysUseDelegationTokensForTests = conf.getBoolean( DFS_NAMENODE_DELEGATION_TOKEN_ALWAYS_USE_KEY, DFS_NAMENODE_DELEGATION_TOKEN_ALWAYS_USE_DEFAULT); + + this.randomizeBlockLocationsPerBlock = conf.getBoolean( + DFS_NAMENODE_RANDOMIZE_BLOCK_LOCATIONS_PER_BLOCK, + DFS_NAMENODE_RANDOMIZE_BLOCK_LOCATIONS_PER_BLOCK_DEFAULT); this.dtSecretManager = createDelegationTokenSecretManager(conf); this.dir = new FSDirectory(this, conf); @@ -979,7 +988,8 @@ public class FSNamesystem implements Nam } // This will start a new log segment and write to the seen_txid file, so // we shouldn't do it when coming up in standby state - if (!haEnabled || (haEnabled && startOpt == StartupOption.UPGRADE)) { + if (!haEnabled || (haEnabled && startOpt == StartupOption.UPGRADE) + || (haEnabled && startOpt == StartupOption.UPGRADEONLY)) { fsImage.openEditLogForWrite(); } success = true; @@ -1699,17 +1709,17 @@ public class FSNamesystem implements Nam LocatedBlocks blocks = getBlockLocations(src, offset, length, true, true, true); if (blocks != null) { - blockManager.getDatanodeManager().sortLocatedBlocks( - clientMachine, blocks.getLocatedBlocks()); - + blockManager.getDatanodeManager().sortLocatedBlocks(clientMachine, + blocks.getLocatedBlocks(), randomizeBlockLocationsPerBlock); + // lastBlock is not part of getLocatedBlocks(), might need to sort it too LocatedBlock lastBlock = blocks.getLastLocatedBlock(); if (lastBlock != null) { ArrayList<LocatedBlock> lastBlockList = Lists.newArrayListWithCapacity(1); lastBlockList.add(lastBlock); - blockManager.getDatanodeManager().sortLocatedBlocks( - clientMachine, lastBlockList); + blockManager.getDatanodeManager().sortLocatedBlocks(clientMachine, + lastBlockList, randomizeBlockLocationsPerBlock); } } return blocks; @@ -7319,7 +7329,18 @@ public class FSNamesystem implements Nam @Override // FSClusterStats public int getNumDatanodesInService() { - return getNumLiveDataNodes() - getNumDecomLiveDataNodes(); + return datanodeStatistics.getNumDatanodesInService(); + } + + @Override // for block placement strategy + public double getInServiceXceiverAverage() { + double avgLoad = 0; + final int nodes = getNumDatanodesInService(); + if (nodes != 0) { + final int xceivers = datanodeStatistics.getInServiceXceiverCount(); + avgLoad = (double)xceivers/nodes; + } + return avgLoad; } public SnapshotManager getSnapshotManager() { @@ -8258,11 +8279,12 @@ public class FSNamesystem implements Nam nnConf.checkXAttrsConfigFlag(); FSPermissionChecker pc = getPermissionChecker(); boolean getAll = xAttrs == null || xAttrs.isEmpty(); - List<XAttr> filteredXAttrs = null; if (!getAll) { - filteredXAttrs = XAttrPermissionFilter.filterXAttrsForApi(pc, xAttrs); - if (filteredXAttrs.isEmpty()) { - return filteredXAttrs; + try { + XAttrPermissionFilter.checkPermissionForApi(pc, xAttrs); + } catch (AccessControlException e) { + logAuditEvent(false, "getXAttrs", src); + throw e; } } checkOperation(OperationCategory.READ); @@ -8281,15 +8303,21 @@ public class FSNamesystem implements Nam if (filteredAll == null || filteredAll.isEmpty()) { return null; } - List<XAttr> toGet = Lists.newArrayListWithCapacity(filteredXAttrs.size()); - for (XAttr xAttr : filteredXAttrs) { + List<XAttr> toGet = Lists.newArrayListWithCapacity(xAttrs.size()); + for (XAttr xAttr : xAttrs) { + boolean foundIt = false; for (XAttr a : filteredAll) { if (xAttr.getNameSpace() == a.getNameSpace() && xAttr.getName().equals(a.getName())) { toGet.add(a); + foundIt = true; break; } } + if (!foundIt) { + throw new IOException( + "At least one of the attributes provided was not found."); + } } return toGet; } @@ -8323,17 +8351,42 @@ public class FSNamesystem implements Nam readUnlock(); } } - + + /** + * Remove an xattr for a file or directory. + * + * @param src + * - path to remove the xattr from + * @param xAttr + * - xAttr to remove + * @throws AccessControlException + * @throws SafeModeException + * @throws UnresolvedLinkException + * @throws IOException + */ void removeXAttr(String src, XAttr xAttr) throws IOException { - nnConf.checkXAttrsConfigFlag(); - HdfsFileStatus resultingStat = null; - FSPermissionChecker pc = getPermissionChecker(); + CacheEntry cacheEntry = RetryCache.waitForCompletion(retryCache); + if (cacheEntry != null && cacheEntry.isSuccess()) { + return; // Return previous response + } + boolean success = false; try { - XAttrPermissionFilter.checkPermissionForApi(pc, xAttr); + removeXAttrInt(src, xAttr, cacheEntry != null); + success = true; } catch (AccessControlException e) { logAuditEvent(false, "removeXAttr", src); throw e; + } finally { + RetryCache.setState(cacheEntry, success); } + } + + void removeXAttrInt(String src, XAttr xAttr, boolean logRetryCache) + throws IOException { + nnConf.checkXAttrsConfigFlag(); + HdfsFileStatus resultingStat = null; + FSPermissionChecker pc = getPermissionChecker(); + XAttrPermissionFilter.checkPermissionForApi(pc, xAttr); checkOperation(OperationCategory.WRITE); byte[][] pathComponents = FSDirectory.getPathComponentsForReservedPath(src); writeLock(); @@ -8347,12 +8400,12 @@ public class FSNamesystem implements Nam xAttrs.add(xAttr); List<XAttr> removedXAttrs = dir.removeXAttrs(src, xAttrs); if (removedXAttrs != null && !removedXAttrs.isEmpty()) { - getEditLog().logRemoveXAttrs(src, removedXAttrs); + getEditLog().logRemoveXAttrs(src, removedXAttrs, logRetryCache); + } else { + throw new IOException( + "No matching attributes found for remove operation"); } resultingStat = getAuditFileInfo(src, false); - } catch (AccessControlException e) { - logAuditEvent(false, "removeXAttr", src); - throw e; } finally { writeUnlock(); } Modified: hadoop/common/branches/YARN-1051/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FileJournalManager.java URL: http://svn.apache.org/viewvc/hadoop/common/branches/YARN-1051/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FileJournalManager.java?rev=1613514&r1=1613513&r2=1613514&view=diff ============================================================================== --- hadoop/common/branches/YARN-1051/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FileJournalManager.java (original) +++ hadoop/common/branches/YARN-1051/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FileJournalManager.java Fri Jul 25 20:33:09 2014 @@ -71,6 +71,8 @@ public class FileJournalManager implemen NameNodeFile.EDITS.getName() + "_(\\d+)-(\\d+)"); private static final Pattern EDITS_INPROGRESS_REGEX = Pattern.compile( NameNodeFile.EDITS_INPROGRESS.getName() + "_(\\d+)"); + private static final Pattern EDITS_INPROGRESS_STALE_REGEX = Pattern.compile( + NameNodeFile.EDITS_INPROGRESS.getName() + "_(\\d+).*(\\S+)"); private File currentInProgress = null; @@ -162,8 +164,7 @@ public class FileJournalManager implemen throws IOException { LOG.info("Purging logs older than " + minTxIdToKeep); File[] files = FileUtil.listFiles(sd.getCurrentDir()); - List<EditLogFile> editLogs = - FileJournalManager.matchEditLogs(files); + List<EditLogFile> editLogs = matchEditLogs(files, true); for (EditLogFile log : editLogs) { if (log.getFirstTxId() < minTxIdToKeep && log.getLastTxId() < minTxIdToKeep) { @@ -244,8 +245,13 @@ public class FileJournalManager implemen public static List<EditLogFile> matchEditLogs(File logDir) throws IOException { return matchEditLogs(FileUtil.listFiles(logDir)); } - + static List<EditLogFile> matchEditLogs(File[] filesInStorage) { + return matchEditLogs(filesInStorage, false); + } + + private static List<EditLogFile> matchEditLogs(File[] filesInStorage, + boolean forPurging) { List<EditLogFile> ret = Lists.newArrayList(); for (File f : filesInStorage) { String name = f.getName(); @@ -256,6 +262,7 @@ public class FileJournalManager implemen long startTxId = Long.parseLong(editsMatch.group(1)); long endTxId = Long.parseLong(editsMatch.group(2)); ret.add(new EditLogFile(f, startTxId, endTxId)); + continue; } catch (NumberFormatException nfe) { LOG.error("Edits file " + f + " has improperly formatted " + "transaction ID"); @@ -270,12 +277,30 @@ public class FileJournalManager implemen long startTxId = Long.parseLong(inProgressEditsMatch.group(1)); ret.add( new EditLogFile(f, startTxId, HdfsConstants.INVALID_TXID, true)); + continue; } catch (NumberFormatException nfe) { LOG.error("In-progress edits file " + f + " has improperly " + "formatted transaction ID"); // skip } } + if (forPurging) { + // Check for in-progress stale edits + Matcher staleInprogressEditsMatch = EDITS_INPROGRESS_STALE_REGEX + .matcher(name); + if (staleInprogressEditsMatch.matches()) { + try { + long startTxId = Long.valueOf(staleInprogressEditsMatch.group(1)); + ret.add(new EditLogFile(f, startTxId, HdfsConstants.INVALID_TXID, + true)); + continue; + } catch (NumberFormatException nfe) { + LOG.error("In-progress stale edits file " + f + " has improperly " + + "formatted transaction ID"); + // skip + } + } + } } return ret; } Modified: hadoop/common/branches/YARN-1051/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/NNStorage.java URL: http://svn.apache.org/viewvc/hadoop/common/branches/YARN-1051/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/NNStorage.java?rev=1613514&r1=1613513&r2=1613514&view=diff ============================================================================== --- hadoop/common/branches/YARN-1051/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/NNStorage.java (original) +++ hadoop/common/branches/YARN-1051/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/NNStorage.java Fri Jul 25 20:33:09 2014 @@ -836,7 +836,7 @@ public class NNStorage extends Storage i */ void processStartupOptionsForUpgrade(StartupOption startOpt, int layoutVersion) throws IOException { - if (startOpt == StartupOption.UPGRADE) { + if (startOpt == StartupOption.UPGRADE || startOpt == StartupOption.UPGRADEONLY) { // If upgrade from a release that does not support federation, // if clusterId is provided in the startupOptions use it. // Else generate a new cluster ID Modified: hadoop/common/branches/YARN-1051/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/NameNode.java URL: http://svn.apache.org/viewvc/hadoop/common/branches/YARN-1051/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/NameNode.java?rev=1613514&r1=1613513&r2=1613514&view=diff ============================================================================== --- hadoop/common/branches/YARN-1051/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/NameNode.java (original) +++ hadoop/common/branches/YARN-1051/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/NameNode.java Fri Jul 25 20:33:09 2014 @@ -210,6 +210,9 @@ public class NameNode implements NameNod + StartupOption.UPGRADE.getName() + " [" + StartupOption.CLUSTERID.getName() + " cid]" + " [" + StartupOption.RENAMERESERVED.getName() + "<k-v pairs>] ] | \n\t[" + + StartupOption.UPGRADEONLY.getName() + + " [" + StartupOption.CLUSTERID.getName() + " cid]" + + " [" + StartupOption.RENAMERESERVED.getName() + "<k-v pairs>] ] | \n\t[" + StartupOption.ROLLBACK.getName() + "] | \n\t[" + StartupOption.ROLLINGUPGRADE.getName() + " <" + RollingUpgradeStartupOption.DOWNGRADE.name().toLowerCase() + "|" @@ -713,6 +716,7 @@ public class NameNode implements NameNod * <li>{@link StartupOption#BACKUP BACKUP} - start backup node</li> * <li>{@link StartupOption#CHECKPOINT CHECKPOINT} - start checkpoint node</li> * <li>{@link StartupOption#UPGRADE UPGRADE} - start the cluster + * <li>{@link StartupOption#UPGRADEONLY UPGRADEONLY} - upgrade the cluster * upgrade and create a snapshot of the current file system state</li> * <li>{@link StartupOption#RECOVER RECOVERY} - recover name node * metadata</li> @@ -767,7 +771,8 @@ public class NameNode implements NameNod } protected HAState createHAState(StartupOption startOpt) { - if (!haEnabled || startOpt == StartupOption.UPGRADE) { + if (!haEnabled || startOpt == StartupOption.UPGRADE + || startOpt == StartupOption.UPGRADEONLY) { return ACTIVE_STATE; } else { return STANDBY_STATE; @@ -1198,8 +1203,10 @@ public class NameNode implements NameNod startOpt = StartupOption.BACKUP; } else if (StartupOption.CHECKPOINT.getName().equalsIgnoreCase(cmd)) { startOpt = StartupOption.CHECKPOINT; - } else if (StartupOption.UPGRADE.getName().equalsIgnoreCase(cmd)) { - startOpt = StartupOption.UPGRADE; + } else if (StartupOption.UPGRADE.getName().equalsIgnoreCase(cmd) + || StartupOption.UPGRADEONLY.getName().equalsIgnoreCase(cmd)) { + startOpt = StartupOption.UPGRADE.getName().equalsIgnoreCase(cmd) ? + StartupOption.UPGRADE : StartupOption.UPGRADEONLY; /* Can be followed by CLUSTERID with a required parameter or * RENAMERESERVED with an optional parameter */ @@ -1407,6 +1414,12 @@ public class NameNode implements NameNod terminate(0); return null; // avoid javac warning } + case UPGRADEONLY: { + DefaultMetricsSystem.initialize("NameNode"); + new NameNode(conf); + terminate(0); + return null; + } default: { DefaultMetricsSystem.initialize("NameNode"); return new NameNode(conf); Modified: hadoop/common/branches/YARN-1051/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/NamenodeFsck.java URL: http://svn.apache.org/viewvc/hadoop/common/branches/YARN-1051/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/NamenodeFsck.java?rev=1613514&r1=1613513&r2=1613514&view=diff ============================================================================== --- hadoop/common/branches/YARN-1051/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/NamenodeFsck.java (original) +++ hadoop/common/branches/YARN-1051/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/NamenodeFsck.java Fri Jul 25 20:33:09 2014 @@ -17,6 +17,9 @@ */ package org.apache.hadoop.hdfs.server.namenode; +import static org.apache.hadoop.fs.CommonConfigurationKeys.IPC_CLIENT_FALLBACK_TO_SIMPLE_AUTH_ALLOWED_DEFAULT; +import static org.apache.hadoop.fs.CommonConfigurationKeys.IPC_CLIENT_FALLBACK_TO_SIMPLE_AUTH_ALLOWED_KEY; + import java.io.FileNotFoundException; import java.io.IOException; import java.io.OutputStream; @@ -47,6 +50,7 @@ import org.apache.hadoop.hdfs.DFSUtil; import org.apache.hadoop.hdfs.RemotePeerFactory; import org.apache.hadoop.hdfs.net.Peer; import org.apache.hadoop.hdfs.net.TcpPeerServer; +import org.apache.hadoop.hdfs.protocol.DatanodeID; import org.apache.hadoop.hdfs.protocol.DatanodeInfo; import org.apache.hadoop.hdfs.protocol.DirectoryListing; import org.apache.hadoop.hdfs.protocol.ExtendedBlock; @@ -55,6 +59,12 @@ import org.apache.hadoop.hdfs.protocol.H import org.apache.hadoop.hdfs.protocol.LocatedBlock; import org.apache.hadoop.hdfs.protocol.LocatedBlocks; import org.apache.hadoop.hdfs.protocol.SnapshottableDirectoryStatus; +import org.apache.hadoop.hdfs.protocol.datatransfer.TrustedChannelResolver; +import org.apache.hadoop.hdfs.protocol.datatransfer.sasl.DataTransferSaslUtil; +import org.apache.hadoop.hdfs.protocol.datatransfer.sasl.DataEncryptionKeyFactory; +import org.apache.hadoop.hdfs.protocol.datatransfer.sasl.SaslDataTransferClient; +import org.apache.hadoop.hdfs.security.token.block.BlockTokenIdentifier; +import org.apache.hadoop.hdfs.security.token.block.DataEncryptionKey; import org.apache.hadoop.hdfs.server.blockmanagement.BlockPlacementPolicy; import org.apache.hadoop.hdfs.server.blockmanagement.BlockPlacementStatus; import org.apache.hadoop.hdfs.server.blockmanagement.NumberReplicas; @@ -65,6 +75,7 @@ import org.apache.hadoop.net.NetworkTopo import org.apache.hadoop.net.NodeBase; import org.apache.hadoop.security.AccessControlException; import org.apache.hadoop.security.UserGroupInformation; +import org.apache.hadoop.security.token.Token; import org.apache.hadoop.util.Time; import com.google.common.annotations.VisibleForTesting; @@ -92,7 +103,7 @@ import com.google.common.annotations.Vis * factors of each file. */ @InterfaceAudience.Private -public class NamenodeFsck { +public class NamenodeFsck implements DataEncryptionKeyFactory { public static final Log LOG = LogFactory.getLog(NameNode.class.getName()); // return string marking fsck status @@ -115,6 +126,7 @@ public class NamenodeFsck { private boolean showBlocks = false; private boolean showLocations = false; private boolean showRacks = false; + private boolean showprogress = false; private boolean showCorruptFileBlocks = false; /** @@ -149,6 +161,7 @@ public class NamenodeFsck { private List<String> snapshottableDirs = null; private final BlockPlacementPolicy bpPolicy; + private final SaslDataTransferClient saslClient; /** * Filesystem checker. @@ -175,6 +188,12 @@ public class NamenodeFsck { networktopology, namenode.getNamesystem().getBlockManager().getDatanodeManager() .getHost2DatanodeMap()); + this.saslClient = new SaslDataTransferClient( + DataTransferSaslUtil.getSaslPropertiesResolver(conf), + TrustedChannelResolver.getInstance(conf), + conf.getBoolean( + IPC_CLIENT_FALLBACK_TO_SIMPLE_AUTH_ALLOWED_KEY, + IPC_CLIENT_FALLBACK_TO_SIMPLE_AUTH_ALLOWED_DEFAULT)); for (Iterator<String> it = pmap.keySet().iterator(); it.hasNext();) { String key = it.next(); @@ -185,6 +204,7 @@ public class NamenodeFsck { else if (key.equals("blocks")) { this.showBlocks = true; } else if (key.equals("locations")) { this.showLocations = true; } else if (key.equals("racks")) { this.showRacks = true; } + else if (key.equals("showprogress")) { this.showprogress = true; } else if (key.equals("openforwrite")) {this.showOpenFiles = true; } else if (key.equals("listcorruptfileblocks")) { this.showCorruptFileBlocks = true; @@ -363,10 +383,13 @@ public class NamenodeFsck { } else if (showFiles) { out.print(path + " " + fileLen + " bytes, " + blocks.locatedBlockCount() + " block(s): "); - } else { + } else if (showprogress) { out.print('.'); } - if (res.totalFiles % 100 == 0) { out.println(); out.flush(); } + if ((showprogress) && res.totalFiles % 100 == 0) { + out.println(); + out.flush(); + } int missing = 0; int corrupt = 0; long missize = 0; @@ -616,15 +639,16 @@ public class NamenodeFsck { setConfiguration(namenode.conf). setRemotePeerFactory(new RemotePeerFactory() { @Override - public Peer newConnectedPeer(InetSocketAddress addr) + public Peer newConnectedPeer(InetSocketAddress addr, + Token<BlockTokenIdentifier> blockToken, DatanodeID datanodeId) throws IOException { Peer peer = null; Socket s = NetUtils.getDefaultSocketFactory(conf).createSocket(); try { s.connect(addr, HdfsServerConstants.READ_TIMEOUT); s.setSoTimeout(HdfsServerConstants.READ_TIMEOUT); - peer = TcpPeerServer.peerFromSocketAndKey(s, namenode.getRpcServer(). - getDataEncryptionKey()); + peer = TcpPeerServer.peerFromSocketAndKey(saslClient, s, + NamenodeFsck.this, blockToken, datanodeId); } finally { if (peer == null) { IOUtils.closeQuietly(s); @@ -663,7 +687,12 @@ public class NamenodeFsck { throw new Exception("Could not copy block data for " + lblock.getBlock()); } } - + + @Override + public DataEncryptionKey newDataEncryptionKey() throws IOException { + return namenode.getRpcServer().getDataEncryptionKey(); + } + /* * XXX (ab) See comment above for copyBlock(). * Modified: hadoop/common/branches/YARN-1051/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/XAttrPermissionFilter.java URL: http://svn.apache.org/viewvc/hadoop/common/branches/YARN-1051/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/XAttrPermissionFilter.java?rev=1613514&r1=1613513&r2=1613514&view=diff ============================================================================== --- hadoop/common/branches/YARN-1051/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/XAttrPermissionFilter.java (original) +++ hadoop/common/branches/YARN-1051/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/XAttrPermissionFilter.java Fri Jul 25 20:33:09 2014 @@ -26,6 +26,7 @@ import org.apache.hadoop.hdfs.XAttrHelpe import org.apache.hadoop.security.AccessControlException; import com.google.common.collect.Lists; +import com.google.common.base.Preconditions; /** * There are four types of extended attributes <XAttr> defined by the @@ -60,8 +61,20 @@ public class XAttrPermissionFilter { throw new AccessControlException("User doesn't have permission for xattr: " + XAttrHelper.getPrefixName(xAttr)); } - - static List<XAttr> filterXAttrsForApi(FSPermissionChecker pc, + + static void checkPermissionForApi(FSPermissionChecker pc, + List<XAttr> xAttrs) throws AccessControlException { + Preconditions.checkArgument(xAttrs != null); + if (xAttrs.isEmpty()) { + return; + } + + for (XAttr xAttr : xAttrs) { + checkPermissionForApi(pc, xAttr); + } + } + + static List<XAttr> filterXAttrsForApi(FSPermissionChecker pc, List<XAttr> xAttrs) { assert xAttrs != null : "xAttrs can not be null"; if (xAttrs == null || xAttrs.isEmpty()) { Modified: hadoop/common/branches/YARN-1051/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/XAttrStorage.java URL: http://svn.apache.org/viewvc/hadoop/common/branches/YARN-1051/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/XAttrStorage.java?rev=1613514&r1=1613513&r2=1613514&view=diff ============================================================================== --- hadoop/common/branches/YARN-1051/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/XAttrStorage.java (original) +++ hadoop/common/branches/YARN-1051/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/XAttrStorage.java Fri Jul 25 20:33:09 2014 @@ -19,24 +19,30 @@ package org.apache.hadoop.hdfs.server.namenode; import java.util.List; +import java.util.Map; +import com.google.common.collect.ImmutableList; +import com.google.common.collect.Lists; +import com.google.common.collect.Maps; import org.apache.hadoop.classification.InterfaceAudience; import org.apache.hadoop.fs.XAttr; import org.apache.hadoop.hdfs.protocol.QuotaExceededException; -import org.apache.hadoop.hdfs.server.namenode.INode; - -import com.google.common.collect.ImmutableList; /** * XAttrStorage is used to read and set xattrs for an inode. */ @InterfaceAudience.Private public class XAttrStorage { - + + private static final Map<String, String> internedNames = Maps.newHashMap(); + /** * Reads the existing extended attributes of an inode. If the * inode does not have an <code>XAttr</code>, then this method * returns an empty list. + * <p/> + * Must be called while holding the FSDirectory read lock. + * * @param inode INode to read * @param snapshotId * @return List<XAttr> <code>XAttr</code> list. @@ -48,6 +54,9 @@ public class XAttrStorage { /** * Reads the existing extended attributes of an inode. + * <p/> + * Must be called while holding the FSDirectory read lock. + * * @param inode INode to read. * @return List<XAttr> <code>XAttr</code> list. */ @@ -58,6 +67,9 @@ public class XAttrStorage { /** * Update xattrs of inode. + * <p/> + * Must be called while holding the FSDirectory write lock. + * * @param inode INode to update * @param xAttrs to update xAttrs. * @param snapshotId id of the latest snapshot of the inode @@ -70,8 +82,24 @@ public class XAttrStorage { } return; } - - ImmutableList<XAttr> newXAttrs = ImmutableList.copyOf(xAttrs); + // Dedupe the xAttr name and save them into a new interned list + List<XAttr> internedXAttrs = Lists.newArrayListWithCapacity(xAttrs.size()); + for (XAttr xAttr : xAttrs) { + final String name = xAttr.getName(); + String internedName = internedNames.get(name); + if (internedName == null) { + internedName = name; + internedNames.put(internedName, internedName); + } + XAttr internedXAttr = new XAttr.Builder() + .setName(internedName) + .setNameSpace(xAttr.getNameSpace()) + .setValue(xAttr.getValue()) + .build(); + internedXAttrs.add(internedXAttr); + } + // Save the list of interned xattrs + ImmutableList<XAttr> newXAttrs = ImmutableList.copyOf(internedXAttrs); if (inode.getXAttrFeature() != null) { inode.removeXAttrFeature(snapshotId); } Modified: hadoop/common/branches/YARN-1051/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/ha/BootstrapStandby.java URL: http://svn.apache.org/viewvc/hadoop/common/branches/YARN-1051/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/ha/BootstrapStandby.java?rev=1613514&r1=1613513&r2=1613514&view=diff ============================================================================== --- hadoop/common/branches/YARN-1051/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/ha/BootstrapStandby.java (original) +++ hadoop/common/branches/YARN-1051/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/ha/BootstrapStandby.java Fri Jul 25 20:33:09 2014 @@ -81,6 +81,7 @@ public class BootstrapStandby implements private boolean force = false; private boolean interactive = true; + private boolean skipSharedEditsCheck = false; // Exit/return codes. static final int ERR_CODE_FAILED_CONNECT = 2; @@ -117,6 +118,8 @@ public class BootstrapStandby implements force = true; } else if ("-nonInteractive".equals(arg)) { interactive = false; + } else if ("-skipSharedEditsCheck".equals(arg)) { + skipSharedEditsCheck = true; } else { printUsage(); throw new HadoopIllegalArgumentException( @@ -127,7 +130,7 @@ public class BootstrapStandby implements private void printUsage() { System.err.println("Usage: " + this.getClass().getSimpleName() + - "[-force] [-nonInteractive]"); + " [-force] [-nonInteractive] [-skipSharedEditsCheck]"); } private NamenodeProtocol createNNProtocolProxy() @@ -200,7 +203,7 @@ public class BootstrapStandby implements // Ensure that we have enough edits already in the shared directory to // start up from the last checkpoint on the active. - if (!checkLogsAvailableForRead(image, imageTxId, curTxId)) { + if (!skipSharedEditsCheck && !checkLogsAvailableForRead(image, imageTxId, curTxId)) { return ERR_CODE_LOGS_UNAVAILABLE; }