Author: jitendra Date: Thu Nov 24 01:52:43 2011 New Revision: 1205698 URL: http://svn.apache.org/viewvc?rev=1205698&view=rev Log: Merged r1205243 from branch-0.20-security for HDFS-2246.
Added: hadoop/common/branches/branch-0.20-security-205/src/hdfs/org/apache/hadoop/hdfs/BlockReaderLocal.java - copied unchanged from r1205243, hadoop/common/branches/branch-0.20-security/src/hdfs/org/apache/hadoop/hdfs/BlockReaderLocal.java hadoop/common/branches/branch-0.20-security-205/src/hdfs/org/apache/hadoop/hdfs/protocol/BlockLocalPathInfo.java - copied unchanged from r1205243, hadoop/common/branches/branch-0.20-security/src/hdfs/org/apache/hadoop/hdfs/protocol/BlockLocalPathInfo.java hadoop/common/branches/branch-0.20-security-205/src/test/org/apache/hadoop/hdfs/TestShortCircuitLocalRead.java - copied unchanged from r1205243, hadoop/common/branches/branch-0.20-security/src/test/org/apache/hadoop/hdfs/TestShortCircuitLocalRead.java Modified: hadoop/common/branches/branch-0.20-security-205/CHANGES.txt hadoop/common/branches/branch-0.20-security-205/src/hdfs/org/apache/hadoop/hdfs/DFSClient.java hadoop/common/branches/branch-0.20-security-205/src/hdfs/org/apache/hadoop/hdfs/DFSConfigKeys.java hadoop/common/branches/branch-0.20-security-205/src/hdfs/org/apache/hadoop/hdfs/protocol/ClientDatanodeProtocol.java hadoop/common/branches/branch-0.20-security-205/src/hdfs/org/apache/hadoop/hdfs/server/datanode/BlockMetadataHeader.java hadoop/common/branches/branch-0.20-security-205/src/hdfs/org/apache/hadoop/hdfs/server/datanode/DataNode.java hadoop/common/branches/branch-0.20-security-205/src/hdfs/org/apache/hadoop/hdfs/server/datanode/FSDataset.java hadoop/common/branches/branch-0.20-security-205/src/hdfs/org/apache/hadoop/hdfs/server/datanode/FSDatasetInterface.java hadoop/common/branches/branch-0.20-security-205/src/hdfs/org/apache/hadoop/hdfs/server/datanode/metrics/DataNodeInstrumentation.java hadoop/common/branches/branch-0.20-security-205/src/test/commit-tests hadoop/common/branches/branch-0.20-security-205/src/test/org/apache/hadoop/hdfs/server/datanode/SimulatedFSDataset.java Modified: hadoop/common/branches/branch-0.20-security-205/CHANGES.txt URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.20-security-205/CHANGES.txt?rev=1205698&r1=1205697&r2=1205698&view=diff ============================================================================== --- hadoop/common/branches/branch-0.20-security-205/CHANGES.txt (original) +++ hadoop/common/branches/branch-0.20-security-205/CHANGES.txt Thu Nov 24 01:52:43 2011 @@ -46,6 +46,9 @@ Release 0.20.205.1 - unreleased HADOOP-7810. Add tools jar to the end of classpath. (John George via omalley) + HDFS-2246. Shortcut a local client reads to a Datanodes files directly. + (Andrew Purtell, Suresh, Jitendra) + BUG FIXES HADOOP-7827. jsp pages missing DOCTYPE. (Dave Vronay via mattf) Modified: hadoop/common/branches/branch-0.20-security-205/src/hdfs/org/apache/hadoop/hdfs/DFSClient.java URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.20-security-205/src/hdfs/org/apache/hadoop/hdfs/DFSClient.java?rev=1205698&r1=1205697&r2=1205698&view=diff ============================================================================== --- hadoop/common/branches/branch-0.20-security-205/src/hdfs/org/apache/hadoop/hdfs/DFSClient.java (original) +++ hadoop/common/branches/branch-0.20-security-205/src/hdfs/org/apache/hadoop/hdfs/DFSClient.java Thu Nov 24 01:52:43 2011 @@ -93,6 +93,7 @@ public class DFSClient implements FSCons final int writePacketSize; private final FileSystem.Statistics stats; private int maxBlockAcquireFailures; + private boolean shortCircuitLocalReads; /** * We assume we're talking to another CDH server, which supports @@ -144,6 +145,7 @@ public class DFSClient implements FSCons rpcNamenode, methodNameToPolicyMap); } + /** Create {@link ClientDatanodeProtocol} proxy with block/token */ static ClientDatanodeProtocol createClientDatanodeProtocolProxy ( DatanodeID datanodeid, Configuration conf, Block block, Token<BlockTokenIdentifier> token, int socketTimeout) throws IOException { @@ -160,6 +162,20 @@ public class DFSClient implements FSCons .getDefaultSocketFactory(conf), socketTimeout); } + /** Create {@link ClientDatanodeProtocol} proxy using kerberos ticket */ + static ClientDatanodeProtocol createClientDatanodeProtocolProxy( + DatanodeID datanodeid, Configuration conf, int socketTimeout) + throws IOException { + InetSocketAddress addr = NetUtils.createSocketAddr( + datanodeid.getHost() + ":" + datanodeid.getIpcPort()); + if (ClientDatanodeProtocol.LOG.isDebugEnabled()) { + ClientDatanodeProtocol.LOG.info("ClientDatanodeProtocol addr=" + addr); + } + return (ClientDatanodeProtocol)RPC.getProxy(ClientDatanodeProtocol.class, + ClientDatanodeProtocol.versionID, addr, conf, NetUtils + .getDefaultSocketFactory(conf), socketTimeout); + } + /** * Same as this(NameNode.getAddress(conf), conf); * @see #DFSClient(InetSocketAddress, Configuration) @@ -206,7 +222,7 @@ public class DFSClient implements FSCons // dfs.write.packet.size is an internal config variable this.writePacketSize = conf.getInt("dfs.write.packet.size", 64*1024); this.maxBlockAcquireFailures = getMaxBlockAcquireFailures(conf); - + ugi = UserGroupInformation.getCurrentUser(); String taskId = conf.get("mapred.task.id"); @@ -229,6 +245,13 @@ public class DFSClient implements FSCons "Expecting exactly one of nameNodeAddr and rpcNamenode being null: " + "nameNodeAddr=" + nameNodeAddr + ", rpcNamenode=" + rpcNamenode); } + // read directly from the block file if configured. + this.shortCircuitLocalReads = conf.getBoolean( + DFSConfigKeys.DFS_CLIENT_READ_SHORTCIRCUIT_KEY, + DFSConfigKeys.DFS_CLIENT_READ_SHORTCIRCUIT_DEFAULT); + if (LOG.isDebugEnabled()) { + LOG.debug("Short circuit read is " + shortCircuitLocalReads); + } } static int getMaxBlockAcquireFailures(Configuration conf) { @@ -325,6 +348,82 @@ public class DFSClient implements FSCons } /** + * Get {@link BlockReader} for short circuited local reads. + */ + private static BlockReader getLocalBlockReader(Configuration conf, + String src, Block blk, Token<BlockTokenIdentifier> accessToken, + DatanodeInfo chosenNode, int socketTimeout, long offsetIntoBlock) + throws InvalidToken, IOException { + try { + return BlockReaderLocal.newBlockReader(conf, src, blk, accessToken, + chosenNode, socketTimeout, offsetIntoBlock, blk.getNumBytes() + - offsetIntoBlock); + } catch (RemoteException re) { + throw re.unwrapRemoteException(InvalidToken.class, + AccessControlException.class); + } + } + + private static Set<String> localIpAddresses = Collections + .synchronizedSet(new HashSet<String>()); + + private static boolean isLocalAddress(InetSocketAddress targetAddr) { + InetAddress addr = targetAddr.getAddress(); + if (localIpAddresses.contains(addr.getHostAddress())) { + if (LOG.isTraceEnabled()) { + LOG.trace("Address " + targetAddr + " is local"); + } + return true; + } + + // Check if the address is any local or loop back + boolean local = addr.isAnyLocalAddress() || addr.isLoopbackAddress(); + + // Check if the address is defined on any interface + if (!local) { + try { + local = NetworkInterface.getByInetAddress(addr) != null; + } catch (SocketException e) { + local = false; + } + } + if (LOG.isTraceEnabled()) { + LOG.trace("Address " + targetAddr + " is local"); + } + if (local == true) { + localIpAddresses.add(addr.getHostAddress()); + } + return local; + } + + /** + * Should the block access token be refetched on an exception + * + * @param ex Exception received + * @param targetAddr Target datanode address from where exception was received + * @return true if block access token has expired or invalid and it should be + * refetched + */ + private static boolean tokenRefetchNeeded(IOException ex, + InetSocketAddress targetAddr) { + /* + * Get a new access token and retry. Retry is needed in 2 cases. 1) When + * both NN and DN re-started while DFSClient holding a cached access token. + * 2) In the case that NN fails to update its access key at pre-set interval + * (by a wide margin) and subsequently restarts. In this case, DN + * re-registers itself with NN and receives a new access key, but DN will + * delete the old access key from its memory since it's considered expired + * based on the estimated expiration date. + */ + if (ex instanceof InvalidBlockTokenException || ex instanceof InvalidToken) { + LOG.info("Access token was invalid when connecting to " + targetAddr + + " : " + ex); + return true; + } + return false; + } + + /** * Cancel a delegation token * @param token the token to cancel * @throws InvalidToken @@ -1312,16 +1411,16 @@ public class DFSClient implements FSCons private Socket dnSock; //for now just sending checksumOk. private DataInputStream in; - private DataChecksum checksum; - private long lastChunkOffset = -1; - private long lastChunkLen = -1; + protected DataChecksum checksum; + protected long lastChunkOffset = -1; + protected long lastChunkLen = -1; private long lastSeqNo = -1; - private long startOffset; - private long firstChunkOffset; - private int bytesPerChecksum; - private int checksumSize; - private boolean gotEOS = false; + protected long startOffset; + protected long firstChunkOffset; + protected int bytesPerChecksum; + protected int checksumSize; + protected boolean gotEOS = false; byte[] skipBuf = null; ByteBuffer checksumBytes = null; @@ -1358,7 +1457,8 @@ public class DFSClient implements FSCons int nRead = super.read(buf, off, len); // if gotEOS was set in the previous read and checksum is enabled : - if (gotEOS && !eosBefore && nRead >= 0 && needChecksum()) { + if (dnSock != null && gotEOS && !eosBefore && nRead >= 0 + && needChecksum()) { //checksum is verified and there are no errors. checksumOk(dnSock); } @@ -1536,14 +1636,44 @@ public class DFSClient implements FSCons checksumSize = this.checksum.getChecksumSize(); } + /** + * Public constructor + */ + BlockReader(Path file, int numRetries) { + super(file, numRetries); + } + + protected BlockReader(Path file, int numRetries, DataChecksum checksum, + boolean verifyChecksum) { + super(file, + numRetries, + verifyChecksum, + checksum.getChecksumSize() > 0? checksum : null, + checksum.getBytesPerChecksum(), + checksum.getChecksumSize()); + } + public static BlockReader newBlockReader(Socket sock, String file, long blockId, Token<BlockTokenIdentifier> accessToken, long genStamp, long startOffset, long len, int bufferSize) throws IOException { return newBlockReader(sock, file, blockId, accessToken, genStamp, startOffset, len, bufferSize, true); } - /** Java Doc required */ - public static BlockReader newBlockReader( Socket sock, String file, long blockId, + /** + * Creates a new {@link BlockReader} for the given blockId. + * @param sock Socket to read the block. + * @param file File to which this block belongs. + * @param blockId Block id. + * @param accessToken Block access token. + * @param genStamp Generation stamp of the block. + * @param startOffset Start offset for the data. + * @param len Length to be read. + * @param bufferSize Buffer size to use. + * @param verifyChecksum Checksum verification is required or not. + * @return BlockReader object. + * @throws IOException + */ + public static BlockReader newBlockReader(Socket sock, String file, long blockId, Token<BlockTokenIdentifier> accessToken, long genStamp, long startOffset, long len, @@ -1887,6 +2017,14 @@ public class DFSClient implements FSCons return blockRange; } + private boolean shouldTryShortCircuitRead(InetSocketAddress targetAddr) + throws IOException { + if (shortCircuitLocalReads && isLocalAddress(targetAddr)) { + return true; + } + return false; + } + /** * Open a DataInputStream to a DataNode so that it can be read from. * We get block ID and the IDs of the destinations at startup, from the namenode. @@ -1923,13 +2061,37 @@ public class DFSClient implements FSCons chosenNode = retval.info; InetSocketAddress targetAddr = retval.addr; + // try reading the block locally. if this fails, then go via + // the datanode + Block blk = targetBlock.getBlock(); + Token<BlockTokenIdentifier> accessToken = targetBlock.getBlockToken(); + if (shouldTryShortCircuitRead(targetAddr)) { + try { + blockReader = getLocalBlockReader(conf, src, blk, accessToken, + chosenNode, DFSClient.this.socketTimeout, offsetIntoBlock); + return chosenNode; + } catch (AccessControlException ex) { + LOG.warn("Short circuit access failed ", ex); + //Disable short circuit reads + shortCircuitLocalReads = false; + } catch (IOException ex) { + if (refetchToken > 0 && tokenRefetchNeeded(ex, targetAddr)) { + /* Get a new access token and retry. */ + refetchToken--; + fetchBlockAt(target); + continue; + } else { + LOG.info("Failed to read block " + targetBlock.getBlock() + + " on local machine" + StringUtils.stringifyException(ex)); + LOG.info("Try reading via the datanode on " + targetAddr); + } + } + } + try { s = socketFactory.createSocket(); NetUtils.connect(s, targetAddr, socketTimeout); s.setSoTimeout(socketTimeout); - Block blk = targetBlock.getBlock(); - Token<BlockTokenIdentifier> accessToken = targetBlock.getBlockToken(); - blockReader = BlockReader.newBlockReader(s, src, blk.getBlockId(), accessToken, blk.getGenerationStamp(), @@ -1937,20 +2099,7 @@ public class DFSClient implements FSCons buffersize, verifyChecksum, clientName); return chosenNode; } catch (IOException ex) { - if (ex instanceof InvalidBlockTokenException && refetchToken > 0) { - LOG.info("Will fetch a new access token and retry, " - + "access token was invalid when connecting to " + targetAddr - + " : " + ex); - /* - * Get a new access token and retry. Retry is needed in 2 cases. 1) - * When both NN and DN re-started while DFSClient holding a cached - * access token. 2) In the case that NN fails to update its - * access key at pre-set interval (by a wide margin) and - * subsequently restarts. In this case, DN re-registers itself with - * NN and receives a new access key, but DN will delete the old - * access key from its memory since it's considered expired based on - * the estimated expiration date. - */ + if (refetchToken > 0 && tokenRefetchNeeded(ex, targetAddr)) { refetchToken--; fetchBlockAt(target); } else { @@ -1965,8 +2114,7 @@ public class DFSClient implements FSCons if (s != null) { try { s.close(); - } catch (IOException iex) { - } + } catch (IOException iex) { } } s = null; } @@ -2154,21 +2302,31 @@ public class DFSClient implements FSCons DatanodeInfo chosenNode = retval.info; InetSocketAddress targetAddr = retval.addr; BlockReader reader = null; - + + int len = (int) (end - start + 1); try { - dn = socketFactory.createSocket(); - NetUtils.connect(dn, targetAddr, socketTimeout); - dn.setSoTimeout(socketTimeout); Token<BlockTokenIdentifier> accessToken = block.getBlockToken(); - - int len = (int) (end - start + 1); - - reader = BlockReader.newBlockReader(dn, src, - block.getBlock().getBlockId(), - accessToken, - block.getBlock().getGenerationStamp(), - start, len, buffersize, - verifyChecksum, clientName); + // first try reading the block locally. + if (shouldTryShortCircuitRead(targetAddr)) { + try { + reader = getLocalBlockReader(conf, src, block.getBlock(), + accessToken, chosenNode, DFSClient.this.socketTimeout, start); + } catch (AccessControlException ex) { + LOG.warn("Short circuit access failed ", ex); + //Disable short circuit reads + shortCircuitLocalReads = false; + continue; + } + } else { + // go to the datanode + dn = socketFactory.createSocket(); + NetUtils.connect(dn, targetAddr, socketTimeout); + dn.setSoTimeout(socketTimeout); + reader = BlockReader.newBlockReader(dn, src, + block.getBlock().getBlockId(), accessToken, + block.getBlock().getGenerationStamp(), start, len, buffersize, + verifyChecksum, clientName); + } int nread = reader.readAll(buf, offset, len); if (nread != len) { throw new IOException("truncated return from reader.read(): " + @@ -2181,10 +2339,7 @@ public class DFSClient implements FSCons e.getPos() + " from " + chosenNode.getName()); reportChecksumFailure(src, block.getBlock(), chosenNode); } catch (IOException e) { - if (e instanceof InvalidBlockTokenException && refetchToken > 0) { - LOG.info("Will get a new access token and retry, " - + "access token was invalid when connecting to " + targetAddr - + " : " + e); + if (refetchToken > 0 && tokenRefetchNeeded(e, targetAddr)) { refetchToken--; fetchBlockAt(block.getStartOffset()); continue; @@ -3314,7 +3469,8 @@ public class DFSClient implements FSCons } catch (IOException ie) { - LOG.info("Exception in createBlockOutputStream " + ie); + LOG.info("Exception in createBlockOutputStream " + nodes[0].getName() + + " " + ie); // find the datanode that matches if (firstBadLink.length() != 0) { Modified: hadoop/common/branches/branch-0.20-security-205/src/hdfs/org/apache/hadoop/hdfs/DFSConfigKeys.java URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.20-security-205/src/hdfs/org/apache/hadoop/hdfs/DFSConfigKeys.java?rev=1205698&r1=1205697&r2=1205698&view=diff ============================================================================== --- hadoop/common/branches/branch-0.20-security-205/src/hdfs/org/apache/hadoop/hdfs/DFSConfigKeys.java (original) +++ hadoop/common/branches/branch-0.20-security-205/src/hdfs/org/apache/hadoop/hdfs/DFSConfigKeys.java Thu Nov 24 01:52:43 2011 @@ -197,6 +197,10 @@ public class DFSConfigKeys extends Commo public static final int DFS_BLOCKREPORT_INITIAL_DELAY_DEFAULT = 0; public static final String DFS_BLOCK_INVALIDATE_LIMIT_KEY = "dfs.block.invalidate.limit"; public static final int DFS_BLOCK_INVALIDATE_LIMIT_DEFAULT = 100; + public static final String DFS_CLIENT_READ_SHORTCIRCUIT_KEY = "dfs.client.read.shortcircuit"; + public static final boolean DFS_CLIENT_READ_SHORTCIRCUIT_DEFAULT = false; + public static final String DFS_CLIENT_READ_SHORTCIRCUIT_SKIP_CHECKSUM_KEY = "dfs.client.read.shortcircuit.skip.checksum"; + public static final boolean DFS_CLIENT_READ_SHORTCIRCUIT_SKIP_CHECKSUM_DEFAULT = false; //Keys with no defaults public static final String DFS_DATANODE_PLUGINS_KEY = "dfs.datanode.plugins"; @@ -219,4 +223,5 @@ public class DFSConfigKeys extends Commo public static final String DFS_WEB_AUTHENTICATION_KERBEROS_PRINCIPAL_KEY = "dfs.web.authentication.kerberos.principal"; public static final String DFS_WEB_AUTHENTICATION_KERBEROS_KEYTAB_KEY = "dfs.web.authentication.kerberos.keytab"; + public static final String DFS_BLOCK_LOCAL_PATH_ACCESS_USER_KEY = "dfs.block.local-path-access.user"; } Modified: hadoop/common/branches/branch-0.20-security-205/src/hdfs/org/apache/hadoop/hdfs/protocol/ClientDatanodeProtocol.java URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.20-security-205/src/hdfs/org/apache/hadoop/hdfs/protocol/ClientDatanodeProtocol.java?rev=1205698&r1=1205697&r2=1205698&view=diff ============================================================================== --- hadoop/common/branches/branch-0.20-security-205/src/hdfs/org/apache/hadoop/hdfs/protocol/ClientDatanodeProtocol.java (original) +++ hadoop/common/branches/branch-0.20-security-205/src/hdfs/org/apache/hadoop/hdfs/protocol/ClientDatanodeProtocol.java Thu Nov 24 01:52:43 2011 @@ -21,12 +21,18 @@ import java.io.IOException; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.hdfs.DFSConfigKeys; +import org.apache.hadoop.hdfs.security.token.block.BlockTokenIdentifier; import org.apache.hadoop.hdfs.security.token.block.BlockTokenSelector; import org.apache.hadoop.ipc.VersionedProtocol; +import org.apache.hadoop.security.KerberosInfo; +import org.apache.hadoop.security.token.Token; import org.apache.hadoop.security.token.TokenInfo; /** An client-datanode protocol for block recovery */ +@KerberosInfo( + serverPrincipal = DFSConfigKeys.DFS_DATANODE_USER_NAME_KEY) @TokenInfo(BlockTokenSelector.class) public interface ClientDatanodeProtocol extends VersionedProtocol { public static final Log LOG = LogFactory.getLog(ClientDatanodeProtocol.class); @@ -55,4 +61,29 @@ public interface ClientDatanodeProtocol * @throws IOException if the block does not exist */ Block getBlockInfo(Block block) throws IOException; + + /** + * Retrieves the path names of the block file and metadata file stored on the + * local file system. + * + * In order for this method to work, one of the following should be satisfied: + * <ul> + * <li> + * The client user must be configured at the datanode to be able to use this + * method.</li> + * <li> + * When security is enabled, kerberos authentication must be used to connect + * to the datanode.</li> + * </ul> + * + * @param block + * the specified block on the local datanode + * @param token + * the block access token. + * @return the BlockLocalPathInfo of a block + * @throws IOException + * on error + */ + BlockLocalPathInfo getBlockLocalPathInfo(Block block, + Token<BlockTokenIdentifier> token) throws IOException; } Modified: hadoop/common/branches/branch-0.20-security-205/src/hdfs/org/apache/hadoop/hdfs/server/datanode/BlockMetadataHeader.java URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.20-security-205/src/hdfs/org/apache/hadoop/hdfs/server/datanode/BlockMetadataHeader.java?rev=1205698&r1=1205697&r2=1205698&view=diff ============================================================================== --- hadoop/common/branches/branch-0.20-security-205/src/hdfs/org/apache/hadoop/hdfs/server/datanode/BlockMetadataHeader.java (original) +++ hadoop/common/branches/branch-0.20-security-205/src/hdfs/org/apache/hadoop/hdfs/server/datanode/BlockMetadataHeader.java Thu Nov 24 01:52:43 2011 @@ -26,6 +26,8 @@ import java.io.IOException; import org.apache.hadoop.io.IOUtils; import org.apache.hadoop.util.DataChecksum; +import org.apache.hadoop.classification.InterfaceAudience; +import org.apache.hadoop.classification.InterfaceStability; /** @@ -33,7 +35,9 @@ import org.apache.hadoop.util.DataChecks * This is not related to the Block related functionality in Namenode. * The biggest part of data block metadata is CRC for the block. */ -class BlockMetadataHeader { +@InterfaceAudience.Private +@InterfaceStability.Evolving +public class BlockMetadataHeader { static final short METADATA_VERSION = FSDataset.METADATA_VERSION; @@ -49,12 +53,14 @@ class BlockMetadataHeader { this.checksum = checksum; this.version = version; } - - short getVersion() { + + /** Get the version */ + public short getVersion() { return version; } - DataChecksum getChecksum() { + /** Get the version */ + public DataChecksum getChecksum() { return checksum; } @@ -65,7 +71,7 @@ class BlockMetadataHeader { * @return Metadata Header * @throws IOException */ - static BlockMetadataHeader readHeader(DataInputStream in) throws IOException { + public static BlockMetadataHeader readHeader(DataInputStream in) throws IOException { return readHeader(in.readShort(), in); } Modified: hadoop/common/branches/branch-0.20-security-205/src/hdfs/org/apache/hadoop/hdfs/server/datanode/DataNode.java URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.20-security-205/src/hdfs/org/apache/hadoop/hdfs/server/datanode/DataNode.java?rev=1205698&r1=1205697&r2=1205698&view=diff ============================================================================== --- hadoop/common/branches/branch-0.20-security-205/src/hdfs/org/apache/hadoop/hdfs/server/datanode/DataNode.java (original) +++ hadoop/common/branches/branch-0.20-security-205/src/hdfs/org/apache/hadoop/hdfs/server/datanode/DataNode.java Thu Nov 24 01:52:43 2011 @@ -18,6 +18,8 @@ package org.apache.hadoop.hdfs.server.datanode; import java.io.BufferedOutputStream; +import java.io.ByteArrayInputStream; +import java.io.DataInputStream; import java.io.DataOutputStream; import java.io.File; import java.io.IOException; @@ -62,6 +64,7 @@ import org.apache.hadoop.hdfs.DFSConfigK import org.apache.hadoop.hdfs.HDFSPolicyProvider; import org.apache.hadoop.hdfs.protocol.Block; import org.apache.hadoop.hdfs.protocol.BlockListAsLongs; +import org.apache.hadoop.hdfs.protocol.BlockLocalPathInfo; import org.apache.hadoop.hdfs.protocol.ClientDatanodeProtocol; import org.apache.hadoop.hdfs.protocol.DataTransferProtocol; import org.apache.hadoop.hdfs.protocol.DatanodeID; @@ -71,6 +74,7 @@ import org.apache.hadoop.hdfs.protocol.L import org.apache.hadoop.hdfs.protocol.UnregisteredDatanodeException; import org.apache.hadoop.hdfs.security.token.block.BlockTokenIdentifier; import org.apache.hadoop.hdfs.security.token.block.BlockTokenSecretManager; +import org.apache.hadoop.hdfs.security.token.block.BlockTokenSecretManager.AccessMode; import org.apache.hadoop.hdfs.security.token.block.ExportedBlockKeys; import org.apache.hadoop.hdfs.server.common.GenerationStamp; import org.apache.hadoop.hdfs.server.common.HdfsConstants; @@ -110,8 +114,10 @@ import org.apache.hadoop.metrics2.lib.De import org.apache.hadoop.metrics2.util.MBeans; import org.apache.hadoop.net.DNS; import org.apache.hadoop.net.NetUtils; +import org.apache.hadoop.security.AccessControlException; import org.apache.hadoop.security.SecurityUtil; import org.apache.hadoop.security.UserGroupInformation; +import org.apache.hadoop.security.UserGroupInformation.AuthenticationMethod; import org.apache.hadoop.security.authorize.ServiceAuthorizationManager; import org.apache.hadoop.security.token.Token; import org.apache.hadoop.security.token.TokenIdentifier; @@ -225,6 +231,7 @@ public class DataNode extends Configured boolean isBlockTokenEnabled; BlockTokenSecretManager blockTokenSecretManager; boolean isBlockTokenInitialized = false; + final String userWithLocalPathAccess; /** * Testing hook that allows tests to delay the sending of blockReceived RPCs @@ -277,6 +284,8 @@ public class DataNode extends Configured datanodeObject = this; supportAppends = conf.getBoolean("dfs.support.append", false); + this.userWithLocalPathAccess = conf + .get(DFSConfigKeys.DFS_BLOCK_LOCAL_PATH_ACCESS_USER_KEY); try { startDataNode(conf, dataDirs, resources); } catch (IOException ie) { @@ -1722,6 +1731,89 @@ public class DataNode extends Configured throw new IOException("Unknown protocol to " + getClass().getSimpleName() + ": " + protocol); } + + /** Ensure the authentication method is kerberos */ + private void checkKerberosAuthMethod(String msg) throws IOException { + // User invoking the call must be same as the datanode user + if (!UserGroupInformation.isSecurityEnabled()) { + return; + } + if (UserGroupInformation.getCurrentUser().getAuthenticationMethod() != + AuthenticationMethod.KERBEROS) { + throw new AccessControlException("Error in "+msg+". Only " + + "kerberos based authentication is allowed."); + } + } + + private void checkBlockLocalPathAccess() throws IOException { + checkKerberosAuthMethod("getBlockLocalPathInfo()"); + String currentUser = UserGroupInformation.getCurrentUser().getShortUserName(); + if (!currentUser.equals(this.userWithLocalPathAccess)) { + throw new AccessControlException( + "Can't continue with getBlockLocalPathInfo() " + + "authorization. The user " + currentUser + + " is not allowed to call getBlockLocalPathInfo"); + } + } + + @Override + public BlockLocalPathInfo getBlockLocalPathInfo(Block block, + Token<BlockTokenIdentifier> token) throws IOException { + checkBlockLocalPathAccess(); + checkBlockToken(block, token, BlockTokenSecretManager.AccessMode.READ); + BlockLocalPathInfo info = data.getBlockLocalPathInfo(block); + if (LOG.isDebugEnabled()) { + if (info != null) { + if (LOG.isTraceEnabled()) { + LOG.trace("getBlockLocalPathInfo successful block=" + block + + " blockfile " + info.getBlockPath() + " metafile " + + info.getMetaPath()); + } + } else { + if (LOG.isTraceEnabled()) { + LOG.trace("getBlockLocalPathInfo for block=" + block + + " returning null"); + } + } + } + myMetrics.incrBlocksGetLocalPathInfo(); + return info; + } + + private void checkBlockToken(Block block, Token<BlockTokenIdentifier> token, + AccessMode accessMode) throws IOException { + if (isBlockTokenEnabled && UserGroupInformation.isSecurityEnabled()) { + BlockTokenIdentifier id = new BlockTokenIdentifier(); + ByteArrayInputStream buf = new ByteArrayInputStream(token.getIdentifier()); + DataInputStream in = new DataInputStream(buf); + id.readFields(in); + if (LOG.isDebugEnabled()) { + LOG.debug("Got: " + id.toString()); + } + blockTokenSecretManager.checkAccess(id, null, block, accessMode); + } + } + + /** Check block access token for the given access mode */ + private void checkBlockToken(Block block, + BlockTokenSecretManager.AccessMode accessMode) throws IOException { + if (isBlockTokenEnabled && UserGroupInformation.isSecurityEnabled()) { + Set<TokenIdentifier> tokenIds = UserGroupInformation.getCurrentUser() + .getTokenIdentifiers(); + if (tokenIds.size() != 1) { + throw new IOException("Can't continue with " + + "authorization since " + tokenIds.size() + + " BlockTokenIdentifier " + "is found."); + } + for (TokenIdentifier tokenId : tokenIds) { + BlockTokenIdentifier id = (BlockTokenIdentifier) tokenId; + if (LOG.isDebugEnabled()) { + LOG.debug("Got: " + id.toString()); + } + blockTokenSecretManager.checkAccess(id, null, block, accessMode); + } + } + } /** A convenient class used in lease recovery */ private static class BlockRecord { @@ -1923,28 +2015,13 @@ public class DataNode extends Configured public LocatedBlock recoverBlock(Block block, boolean keepLength, DatanodeInfo[] targets ) throws IOException { logRecoverBlock("Client", block, targets); - if (isBlockTokenEnabled && UserGroupInformation.isSecurityEnabled()) { - Set<TokenIdentifier> tokenIds = UserGroupInformation.getCurrentUser() - .getTokenIdentifiers(); - if (tokenIds.size() != 1) { - throw new IOException("Can't continue with recoverBlock() " - + "authorization since " + tokenIds.size() + " BlockTokenIdentifier " - + "is found."); - } - for (TokenIdentifier tokenId : tokenIds) { - BlockTokenIdentifier id = (BlockTokenIdentifier) tokenId; - if (LOG.isDebugEnabled()) { - LOG.debug("Got: " + id.toString()); - } - blockTokenSecretManager.checkAccess(id, null, block, - BlockTokenSecretManager.AccessMode.WRITE); - } - } + checkBlockToken(block, BlockTokenSecretManager.AccessMode.WRITE); return recoverBlock(block, keepLength, targets, false); } /** {@inheritDoc} */ public Block getBlockInfo(Block block) throws IOException { + checkBlockToken(block, BlockTokenSecretManager.AccessMode.READ); Block stored = data.getStoredBlock(block.getBlockId()); return stored; } Modified: hadoop/common/branches/branch-0.20-security-205/src/hdfs/org/apache/hadoop/hdfs/server/datanode/FSDataset.java URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.20-security-205/src/hdfs/org/apache/hadoop/hdfs/server/datanode/FSDataset.java?rev=1205698&r1=1205697&r2=1205698&view=diff ============================================================================== --- hadoop/common/branches/branch-0.20-security-205/src/hdfs/org/apache/hadoop/hdfs/server/datanode/FSDataset.java (original) +++ hadoop/common/branches/branch-0.20-security-205/src/hdfs/org/apache/hadoop/hdfs/server/datanode/FSDataset.java Thu Nov 24 01:52:43 2011 @@ -43,6 +43,7 @@ import org.apache.hadoop.fs.DF; import org.apache.hadoop.fs.DU; import org.apache.hadoop.fs.FileUtil; import org.apache.hadoop.hdfs.protocol.Block; +import org.apache.hadoop.hdfs.protocol.BlockLocalPathInfo; import org.apache.hadoop.hdfs.protocol.FSConstants; import org.apache.hadoop.metrics2.util.MBeans; import org.apache.hadoop.hdfs.server.datanode.metrics.FSDatasetMBean; @@ -1047,6 +1048,16 @@ public class FSDataset implements FSCons return f; } + @Override //FSDatasetInterface + public BlockLocalPathInfo getBlockLocalPathInfo(Block block) + throws IOException { + File datafile = getBlockFile(block); + File metafile = getMetaFile(datafile, block); + BlockLocalPathInfo info = new BlockLocalPathInfo(block, + datafile.getAbsolutePath(), metafile.getAbsolutePath()); + return info; + } + public synchronized InputStream getBlockInputStream(Block b) throws IOException { return new FileInputStream(getBlockFile(b)); } Modified: hadoop/common/branches/branch-0.20-security-205/src/hdfs/org/apache/hadoop/hdfs/server/datanode/FSDatasetInterface.java URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.20-security-205/src/hdfs/org/apache/hadoop/hdfs/server/datanode/FSDatasetInterface.java?rev=1205698&r1=1205697&r2=1205698&view=diff ============================================================================== --- hadoop/common/branches/branch-0.20-security-205/src/hdfs/org/apache/hadoop/hdfs/server/datanode/FSDatasetInterface.java (original) +++ hadoop/common/branches/branch-0.20-security-205/src/hdfs/org/apache/hadoop/hdfs/server/datanode/FSDatasetInterface.java Thu Nov 24 01:52:43 2011 @@ -19,6 +19,7 @@ package org.apache.hadoop.hdfs.server.da import java.io.Closeable; +import java.io.File; import java.io.FilterInputStream; import java.io.IOException; import java.io.InputStream; @@ -30,6 +31,7 @@ import java.io.OutputStream; import org.apache.hadoop.hdfs.server.datanode.metrics.FSDatasetMBean; import org.apache.hadoop.hdfs.server.protocol.BlockRecoveryInfo; import org.apache.hadoop.hdfs.protocol.Block; +import org.apache.hadoop.hdfs.protocol.BlockLocalPathInfo; import org.apache.hadoop.io.IOUtils; import org.apache.hadoop.util.DiskChecker.DiskErrorException; @@ -309,4 +311,8 @@ public interface FSDatasetInterface exte public boolean hasEnoughResource(); public BlockRecoveryInfo startBlockRecovery(long blockId) throws IOException; + /** + * Get {@link BlockLocalPathInfo} for the given block. + **/ + public BlockLocalPathInfo getBlockLocalPathInfo(Block b) throws IOException; } Modified: hadoop/common/branches/branch-0.20-security-205/src/hdfs/org/apache/hadoop/hdfs/server/datanode/metrics/DataNodeInstrumentation.java URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.20-security-205/src/hdfs/org/apache/hadoop/hdfs/server/datanode/metrics/DataNodeInstrumentation.java?rev=1205698&r1=1205697&r2=1205698&view=diff ============================================================================== --- hadoop/common/branches/branch-0.20-security-205/src/hdfs/org/apache/hadoop/hdfs/server/datanode/metrics/DataNodeInstrumentation.java (original) +++ hadoop/common/branches/branch-0.20-security-205/src/hdfs/org/apache/hadoop/hdfs/server/datanode/metrics/DataNodeInstrumentation.java Thu Nov 24 01:52:43 2011 @@ -49,6 +49,8 @@ public class DataNodeInstrumentation imp registry.newCounter("blocks_verified", "", 0); final MetricMutableCounterInt blockVerificationFailures = registry.newCounter("block_verification_failures", "", 0); + final MetricMutableCounterInt blocksGetLocalPathInfo = + registry.newCounter("blocks_get_local_pathinfo", "", 0); final MetricMutableCounterInt readsFromLocalClient = registry.newCounter("reads_from_local_client", "", 0); @@ -131,6 +133,11 @@ public class DataNodeInstrumentation imp } //@Override + public void incrBlocksGetLocalPathInfo() { + blocksGetLocalPathInfo.incr(); + } + + //@Override public void addReadBlockOp(long latency) { readBlockOp.add(latency); } Modified: hadoop/common/branches/branch-0.20-security-205/src/test/commit-tests URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.20-security-205/src/test/commit-tests?rev=1205698&r1=1205697&r2=1205698&view=diff ============================================================================== --- hadoop/common/branches/branch-0.20-security-205/src/test/commit-tests (original) +++ hadoop/common/branches/branch-0.20-security-205/src/test/commit-tests Thu Nov 24 01:52:43 2011 @@ -110,6 +110,7 @@ **/TestFileAppend.java **/TestFileCorruption.java **/TestFileLimit.java +**/TestShortCircuitLocalRead.java **/TestFileStatus.java **/TestFSInputChecker.java **/TestFSOutputSummer.java Modified: hadoop/common/branches/branch-0.20-security-205/src/test/org/apache/hadoop/hdfs/server/datanode/SimulatedFSDataset.java URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.20-security-205/src/test/org/apache/hadoop/hdfs/server/datanode/SimulatedFSDataset.java?rev=1205698&r1=1205697&r2=1205698&view=diff ============================================================================== --- hadoop/common/branches/branch-0.20-security-205/src/test/org/apache/hadoop/hdfs/server/datanode/SimulatedFSDataset.java (original) +++ hadoop/common/branches/branch-0.20-security-205/src/test/org/apache/hadoop/hdfs/server/datanode/SimulatedFSDataset.java Thu Nov 24 01:52:43 2011 @@ -17,6 +17,7 @@ */ package org.apache.hadoop.hdfs.server.datanode; +import java.io.File; import java.io.IOException; import java.io.InputStream; import java.io.OutputStream; @@ -31,6 +32,7 @@ import javax.management.StandardMBean; import org.apache.hadoop.conf.Configurable; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.hdfs.protocol.Block; +import org.apache.hadoop.hdfs.protocol.BlockLocalPathInfo; import org.apache.hadoop.hdfs.protocol.FSConstants; import org.apache.hadoop.hdfs.server.datanode.metrics.FSDatasetMBean; import org.apache.hadoop.hdfs.server.protocol.BlockRecoveryInfo; @@ -695,4 +697,9 @@ public class SimulatedFSDataset impleme Block stored = getStoredBlock(blockId); return new BlockRecoveryInfo(stored, false); } + + @Override + public BlockLocalPathInfo getBlockLocalPathInfo(Block blk) throws IOException { + throw new IOException("getBlockLocalPathInfo not supported."); + } }