Author: todd Date: Tue Sep 6 00:41:47 2011 New Revision: 1165463 URL: http://svn.apache.org/viewvc?rev=1165463&view=rev Log: HDFS-2197. Refactor RPC call implementations out of NameNode class. Contributed by Todd Lipcon.
Added: hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/NameNodeRpcServer.java Modified: hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/BackupNode.java hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/CancelDelegationTokenServlet.java hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/DfsServlet.java hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/GetDelegationTokenServlet.java hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/NameNode.java hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/NamenodeFsck.java hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/NamenodeJspHelper.java hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/RenewDelegationTokenServlet.java hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/MiniDFSCluster.java hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestClientProtocolForPipelineRecovery.java hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestDFSClientRetries.java hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestDecommission.java hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestFileAppend4.java hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestFileCreation.java hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestLeaseRecovery.java hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestPipelines.java hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/UpgradeUtilities.java hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/security/token/block/TestBlockToken.java hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/balancer/TestBalancerWithMultipleNameNodes.java hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/TestBlockTokenWithDFS.java hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestBlockReport.java hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestDataNodeVolumeFailure.java hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestTransferRbw.java hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/NNThroughputBenchmark.java hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/NameNodeAdapter.java hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/OfflineEditsViewerHelper.java hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestBackupNode.java hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestBlockUnderConstruction.java hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestCheckpoint.java hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestDeadDatanode.java hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestFsck.java hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestHDFSConcat.java hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestListCorruptFileBlocks.java hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestNNStorageRetentionFunctional.java hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestNameNodeJspHelper.java hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestParallelImageWrite.java hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestStartup.java hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestStorageRestore.java hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/metrics/TestNNMetricFilesInGetListingOps.java hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/tools/offlineImageViewer/TestOfflineImageViewer.java Modified: hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt?rev=1165463&r1=1165462&r2=1165463&view=diff ============================================================================== --- hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt (original) +++ hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt Tue Sep 6 00:41:47 2011 @@ -11,6 +11,7 @@ Trunk (unreleased changes) HDFS-1620. Rename HdfsConstants -> HdfsServerConstants, FSConstants -> HdfsConstants. (Harsh J Chouraria via atm) + HDFS-2197. Refactor RPC call implementations out of NameNode class (todd) BUG FIXES HDFS-2287. TestParallelRead has a small off-by-one bug. (todd) Modified: hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/BackupNode.java URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/BackupNode.java?rev=1165463&r1=1165462&r2=1165463&view=diff ============================================================================== --- hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/BackupNode.java (original) +++ hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/BackupNode.java Tue Sep 6 00:41:47 2011 @@ -52,7 +52,7 @@ import org.apache.hadoop.net.NetUtils; * </ol> */ @InterfaceAudience.Private -public class BackupNode extends NameNode implements JournalProtocol { +public class BackupNode extends NameNode { private static final String BN_ADDRESS_NAME_KEY = DFSConfigKeys.DFS_NAMENODE_BACKUP_ADDRESS_KEY; private static final String BN_ADDRESS_DEFAULT = DFSConfigKeys.DFS_NAMENODE_BACKUP_ADDRESS_DEFAULT; private static final String BN_HTTP_ADDRESS_NAME_KEY = DFSConfigKeys.DFS_NAMENODE_BACKUP_HTTP_ADDRESS_KEY; @@ -74,7 +74,6 @@ public class BackupNode extends NameNode BackupNode(Configuration conf, NamenodeRole role) throws IOException { super(conf, role); - this.server.addProtocol(JournalProtocol.class, this); } ///////////////////////////////////////////////////// @@ -96,18 +95,20 @@ public class BackupNode extends NameNode } @Override // NameNode - protected void setRpcServerAddress(Configuration conf) { - conf.set(BN_ADDRESS_NAME_KEY, getHostPortString(rpcAddress)); + protected void setRpcServerAddress(Configuration conf, + InetSocketAddress addr) { + conf.set(BN_ADDRESS_NAME_KEY, getHostPortString(addr)); } @Override // Namenode - protected void setRpcServiceServerAddress(Configuration conf) { - conf.set(BN_SERVICE_RPC_ADDRESS_KEY, getHostPortString(serviceRPCAddress)); + protected void setRpcServiceServerAddress(Configuration conf, + InetSocketAddress addr) { + conf.set(BN_SERVICE_RPC_ADDRESS_KEY, getHostPortString(addr)); } @Override // NameNode protected InetSocketAddress getHttpServerAddress(Configuration conf) { - assert rpcAddress != null : "rpcAddress should be calculated first"; + assert getNameNodeAddress() != null : "rpcAddress should be calculated first"; String addr = conf.get(BN_HTTP_ADDRESS_NAME_KEY, BN_HTTP_ADDRESS_DEFAULT); return NetUtils.createSocketAddr(addr); } @@ -146,6 +147,12 @@ public class BackupNode extends NameNode runCheckpointDaemon(conf); } + @Override + protected NameNodeRpcServer createRpcServer(Configuration conf) + throws IOException { + return new BackupNodeRpcServer(conf, this); + } + @Override // NameNode public void stop() { if(checkpointManager != null) { @@ -178,74 +185,84 @@ public class BackupNode extends NameNode super.stop(); } - - @Override - public long getProtocolVersion(String protocol, long clientVersion) - throws IOException { - if (protocol.equals(JournalProtocol.class.getName())) { - return JournalProtocol.versionID; - } else { - return super.getProtocolVersion(protocol, clientVersion); + static class BackupNodeRpcServer extends NameNodeRpcServer implements JournalProtocol { + private final String nnRpcAddress; + + private BackupNodeRpcServer(Configuration conf, BackupNode nn) + throws IOException { + super(conf, nn); + this.server.addProtocol(JournalProtocol.class, this); + nnRpcAddress = nn.nnRpcAddress; } - } - ///////////////////////////////////////////////////// - // NamenodeProtocol implementation for backup node. - ///////////////////////////////////////////////////// - @Override // NamenodeProtocol - public BlocksWithLocations getBlocks(DatanodeInfo datanode, long size) - throws IOException { - throw new UnsupportedActionException("getBlocks"); - } - - // Only active name-node can register other nodes. - @Override // NamenodeProtocol - public NamenodeRegistration register(NamenodeRegistration registration - ) throws IOException { - throw new UnsupportedActionException("register"); - } - - @Override // NamenodeProtocol - public NamenodeCommand startCheckpoint(NamenodeRegistration registration) - throws IOException { - throw new UnsupportedActionException("startCheckpoint"); - } - - @Override // NamenodeProtocol - public void endCheckpoint(NamenodeRegistration registration, - CheckpointSignature sig) throws IOException { - throw new UnsupportedActionException("endCheckpoint"); - } - - ///////////////////////////////////////////////////// - // BackupNodeProtocol implementation for backup node. - ///////////////////////////////////////////////////// - - @Override - public void journal(NamenodeRegistration nnReg, - long firstTxId, int numTxns, - byte[] records) throws IOException { - verifyRequest(nnReg); - if(!nnRpcAddress.equals(nnReg.getAddress())) - throw new IOException("Journal request from unexpected name-node: " - + nnReg.getAddress() + " expecting " + nnRpcAddress); - getBNImage().journal(firstTxId, numTxns, records); - } - - @Override - public void startLogSegment(NamenodeRegistration registration, long txid) - throws IOException { - verifyRequest(registration); + @Override + public long getProtocolVersion(String protocol, long clientVersion) + throws IOException { + if (protocol.equals(JournalProtocol.class.getName())) { + return JournalProtocol.versionID; + } else { + return super.getProtocolVersion(protocol, clientVersion); + } + } - getBNImage().namenodeStartedLogSegment(txid); - } - - ////////////////////////////////////////////////////// + ///////////////////////////////////////////////////// + // NamenodeProtocol implementation for backup node. + ///////////////////////////////////////////////////// + @Override // NamenodeProtocol + public BlocksWithLocations getBlocks(DatanodeInfo datanode, long size) + throws IOException { + throw new UnsupportedActionException("getBlocks"); + } + + // Only active name-node can register other nodes. + @Override // NamenodeProtocol + public NamenodeRegistration register(NamenodeRegistration registration + ) throws IOException { + throw new UnsupportedActionException("register"); + } + + @Override // NamenodeProtocol + public NamenodeCommand startCheckpoint(NamenodeRegistration registration) + throws IOException { + throw new UnsupportedActionException("startCheckpoint"); + } + @Override // NamenodeProtocol + public void endCheckpoint(NamenodeRegistration registration, + CheckpointSignature sig) throws IOException { + throw new UnsupportedActionException("endCheckpoint"); + } - BackupImage getBNImage() { - return (BackupImage)getFSImage(); + ///////////////////////////////////////////////////// + // BackupNodeProtocol implementation for backup node. + ///////////////////////////////////////////////////// + + @Override + public void journal(NamenodeRegistration nnReg, + long firstTxId, int numTxns, + byte[] records) throws IOException { + verifyRequest(nnReg); + if(!nnRpcAddress.equals(nnReg.getAddress())) + throw new IOException("Journal request from unexpected name-node: " + + nnReg.getAddress() + " expecting " + rpcAddress); + getBNImage().journal(firstTxId, numTxns, records); + } + + @Override + public void startLogSegment(NamenodeRegistration registration, long txid) + throws IOException { + verifyRequest(registration); + + getBNImage().namenodeStartedLogSegment(txid); + } + + private BackupImage getBNImage() { + return (BackupImage)nn.getFSImage(); + } } + + ////////////////////////////////////////////////////// + boolean shouldCheckpointAtStartup() { FSImage fsImage = getFSImage(); Modified: hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/CancelDelegationTokenServlet.java URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/CancelDelegationTokenServlet.java?rev=1165463&r1=1165462&r2=1165463&view=diff ============================================================================== --- hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/CancelDelegationTokenServlet.java (original) +++ hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/CancelDelegationTokenServlet.java Tue Sep 6 00:41:47 2011 @@ -69,7 +69,7 @@ public class CancelDelegationTokenServle try { ugi.doAs(new PrivilegedExceptionAction<Void>() { public Void run() throws Exception { - nn.cancelDelegationToken(token); + nn.getRpcServer().cancelDelegationToken(token); return null; } }); Modified: hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/DfsServlet.java URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/DfsServlet.java?rev=1165463&r1=1165462&r2=1165463&view=diff ============================================================================== --- hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/DfsServlet.java (original) +++ hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/DfsServlet.java Tue Sep 6 00:41:47 2011 @@ -73,7 +73,7 @@ abstract class DfsServlet extends HttpSe // rpc NameNode nn = NameNodeHttpServer.getNameNodeFromContext(context); if (nn != null) { - return nn; + return nn.getRpcServer(); } InetSocketAddress nnAddr = NameNodeHttpServer.getNameNodeAddressFromContext(context); Modified: hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/GetDelegationTokenServlet.java URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/GetDelegationTokenServlet.java?rev=1165463&r1=1165462&r2=1165463&view=diff ============================================================================== --- hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/GetDelegationTokenServlet.java (original) +++ hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/GetDelegationTokenServlet.java Tue Sep 6 00:41:47 2011 @@ -75,7 +75,7 @@ public class GetDelegationTokenServlet e + ":" + NameNode.getAddress(conf).getPort(); Token<DelegationTokenIdentifier> token = - nn.getDelegationToken(new Text(renewerFinal)); + nn.getRpcServer().getDelegationToken(new Text(renewerFinal)); if(token == null) { throw new Exception("couldn't get the token for " +s); } Modified: hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/NameNode.java URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/NameNode.java?rev=1165463&r1=1165462&r2=1165463&view=diff ============================================================================== --- hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/NameNode.java (original) +++ hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/NameNode.java Tue Sep 6 00:41:47 2011 @@ -17,13 +17,14 @@ */ package org.apache.hadoop.hdfs.server.namenode; +import static org.apache.hadoop.fs.CommonConfigurationKeysPublic.FS_DEFAULT_NAME_KEY; +import static org.apache.hadoop.hdfs.DFSConfigKeys.*; + import java.io.File; import java.io.IOException; import java.net.InetSocketAddress; import java.net.URI; -import java.util.Arrays; import java.util.Collection; -import java.util.HashMap; import java.util.Iterator; import java.util.List; @@ -32,79 +33,21 @@ import org.apache.commons.logging.LogFac import org.apache.hadoop.classification.InterfaceAudience; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.CommonConfigurationKeys; -import org.apache.hadoop.fs.ContentSummary; -import org.apache.hadoop.fs.CreateFlag; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.FileUtil; -import org.apache.hadoop.fs.FsServerDefaults; -import org.apache.hadoop.fs.Options; -import org.apache.hadoop.fs.Path; import org.apache.hadoop.fs.Trash; -import org.apache.hadoop.fs.UnresolvedLinkException; -import org.apache.hadoop.fs.permission.FsPermission; -import org.apache.hadoop.fs.permission.PermissionStatus; -import static org.apache.hadoop.hdfs.DFSConfigKeys.*; import org.apache.hadoop.hdfs.DFSUtil; -import org.apache.hadoop.hdfs.HDFSPolicyProvider; import org.apache.hadoop.hdfs.HdfsConfiguration; -import org.apache.hadoop.hdfs.protocol.Block; -import org.apache.hadoop.hdfs.protocol.BlockListAsLongs; -import org.apache.hadoop.hdfs.protocol.ClientProtocol; -import org.apache.hadoop.hdfs.protocol.CorruptFileBlocks; -import org.apache.hadoop.hdfs.protocol.DatanodeID; -import org.apache.hadoop.hdfs.protocol.DatanodeInfo; -import org.apache.hadoop.hdfs.protocol.DirectoryListing; -import org.apache.hadoop.hdfs.protocol.ExtendedBlock; import org.apache.hadoop.hdfs.protocol.HdfsConstants; -import org.apache.hadoop.hdfs.protocol.HdfsConstants.DatanodeReportType; -import org.apache.hadoop.hdfs.protocol.HdfsConstants.SafeModeAction; -import org.apache.hadoop.hdfs.protocol.HdfsConstants.UpgradeAction; -import static org.apache.hadoop.hdfs.protocol.HdfsConstants.MAX_PATH_LENGTH; -import static org.apache.hadoop.hdfs.protocol.HdfsConstants.MAX_PATH_DEPTH; -import org.apache.hadoop.hdfs.protocol.HdfsFileStatus; -import org.apache.hadoop.hdfs.protocol.LocatedBlock; -import org.apache.hadoop.hdfs.protocol.LocatedBlocks; -import org.apache.hadoop.hdfs.protocol.UnregisteredNodeException; -import org.apache.hadoop.hdfs.protocol.UnresolvedPathException; -import org.apache.hadoop.hdfs.security.token.block.ExportedBlockKeys; -import org.apache.hadoop.hdfs.security.token.delegation.DelegationTokenIdentifier; import org.apache.hadoop.hdfs.server.common.HdfsServerConstants.NamenodeRole; import org.apache.hadoop.hdfs.server.common.HdfsServerConstants.StartupOption; -import org.apache.hadoop.hdfs.server.common.IncorrectVersionException; -import org.apache.hadoop.hdfs.server.common.UpgradeStatusReport; import org.apache.hadoop.hdfs.server.namenode.metrics.NameNodeMetrics; -import org.apache.hadoop.hdfs.server.protocol.BlocksWithLocations; -import org.apache.hadoop.hdfs.server.protocol.DatanodeCommand; -import org.apache.hadoop.hdfs.server.protocol.DatanodeProtocol; -import org.apache.hadoop.hdfs.server.protocol.DatanodeRegistration; -import org.apache.hadoop.hdfs.server.protocol.NamenodeCommand; -import org.apache.hadoop.hdfs.server.protocol.NamenodeProtocol; import org.apache.hadoop.hdfs.server.protocol.NamenodeProtocols; import org.apache.hadoop.hdfs.server.protocol.NamenodeRegistration; -import org.apache.hadoop.hdfs.server.protocol.NamespaceInfo; -import org.apache.hadoop.hdfs.server.protocol.NodeRegistration; -import org.apache.hadoop.hdfs.server.protocol.ReceivedDeletedBlockInfo; -import org.apache.hadoop.hdfs.server.protocol.RemoteEditLogManifest; -import org.apache.hadoop.hdfs.server.protocol.UpgradeCommand; -import org.apache.hadoop.io.EnumSetWritable; -import org.apache.hadoop.io.Text; -import org.apache.hadoop.ipc.ProtocolSignature; -import org.apache.hadoop.ipc.RPC; -import org.apache.hadoop.ipc.Server; import org.apache.hadoop.metrics2.lib.DefaultMetricsSystem; import org.apache.hadoop.net.NetUtils; -import org.apache.hadoop.net.Node; -import org.apache.hadoop.security.AccessControlException; -import org.apache.hadoop.security.Groups; -import org.apache.hadoop.security.RefreshUserMappingsProtocol; import org.apache.hadoop.security.SecurityUtil; import org.apache.hadoop.security.UserGroupInformation; -import org.apache.hadoop.security.authorize.AuthorizationException; -import org.apache.hadoop.security.authorize.ProxyUsers; -import org.apache.hadoop.security.authorize.RefreshAuthorizationPolicyProtocol; -import org.apache.hadoop.security.token.SecretManager.InvalidToken; -import org.apache.hadoop.security.token.Token; -import org.apache.hadoop.tools.GetUserMappingsProtocol; import org.apache.hadoop.util.ServicePlugin; import org.apache.hadoop.util.StringUtils; @@ -146,7 +89,7 @@ import org.apache.hadoop.util.StringUtil * NameNode state, for example partial blocksMap etc. **********************************************************/ @InterfaceAudience.Private -public class NameNode implements NamenodeProtocols { +public class NameNode { static{ HdfsConfiguration.init(); } @@ -179,32 +122,6 @@ public class NameNode implements Namenod DFS_NAMENODE_BACKUP_SERVICE_RPC_ADDRESS_KEY }; - public long getProtocolVersion(String protocol, - long clientVersion) throws IOException { - if (protocol.equals(ClientProtocol.class.getName())) { - return ClientProtocol.versionID; - } else if (protocol.equals(DatanodeProtocol.class.getName())){ - return DatanodeProtocol.versionID; - } else if (protocol.equals(NamenodeProtocol.class.getName())){ - return NamenodeProtocol.versionID; - } else if (protocol.equals(RefreshAuthorizationPolicyProtocol.class.getName())){ - return RefreshAuthorizationPolicyProtocol.versionID; - } else if (protocol.equals(RefreshUserMappingsProtocol.class.getName())){ - return RefreshUserMappingsProtocol.versionID; - } else if (protocol.equals(GetUserMappingsProtocol.class.getName())){ - return GetUserMappingsProtocol.versionID; - } else { - throw new IOException("Unknown protocol to name node: " + protocol); - } - } - - - @Override // VersionedProtocol - public ProtocolSignature getProtocolSignature(String protocol, - long clientVersion, int clientMethodsHash) throws IOException { - return ProtocolSignature.getProtocolSignature( - this, protocol, clientVersion, clientMethodsHash); - } public static final int DEFAULT_PORT = 8020; @@ -213,18 +130,6 @@ public class NameNode implements Namenod protected FSNamesystem namesystem; protected NamenodeRole role; - /** RPC server. Package-protected for use in tests. */ - RPC.Server server; - /** RPC server for HDFS Services communication. - BackupNode, Datanodes and all other services - should be connecting to this server if it is - configured. Clients should only go to NameNode#server - */ - protected Server serviceRpcServer; - /** RPC server address */ - protected InetSocketAddress rpcAddress = null; - /** RPC server for DN address */ - protected InetSocketAddress serviceRPCAddress = null; /** httpServer */ protected NameNodeHttpServer httpServer; private Thread emptier; @@ -232,11 +137,11 @@ public class NameNode implements Namenod protected boolean stopRequested = false; /** Registration information of this name-node */ protected NamenodeRegistration nodeRegistration; - /** Is service level authorization enabled? */ - private boolean serviceAuthEnabled = false; /** Activated plug-ins. */ private List<ServicePlugin> plugins; + private NameNodeRpcServer rpcServer; + /** Format a new filesystem. Destroys any filesystem that may already * exist at this location. **/ public static void format(Configuration conf) throws IOException { @@ -252,6 +157,10 @@ public class NameNode implements Namenod return namesystem; } + public NamenodeProtocols getRpcServer() { + return rpcServer; + } + static void initMetrics(Configuration conf, NamenodeRole role) { metrics = NameNodeMetrics.create(conf, role); } @@ -359,11 +268,13 @@ public class NameNode implements Namenod /** * Modifies the configuration passed to contain the service rpc address setting */ - protected void setRpcServiceServerAddress(Configuration conf) { + protected void setRpcServiceServerAddress(Configuration conf, + InetSocketAddress serviceRPCAddress) { setServiceAddress(conf, getHostPortString(serviceRPCAddress)); } - protected void setRpcServerAddress(Configuration conf) { + protected void setRpcServerAddress(Configuration conf, + InetSocketAddress rpcAddress) { FileSystem.setDefaultUri(conf, getUri(rpcAddress)); } @@ -387,7 +298,7 @@ public class NameNode implements Namenod NamenodeRegistration setRegistration() { nodeRegistration = new NamenodeRegistration( - getHostPortString(rpcAddress), + getHostPortString(rpcServer.getRpcAddress()), getHostPortString(getHttpAddress()), getFSImage().getStorage(), getRole()); return nodeRegistration; @@ -408,45 +319,13 @@ public class NameNode implements Namenod * @param conf the configuration */ protected void initialize(Configuration conf) throws IOException { - InetSocketAddress socAddr = getRpcServerAddress(conf); UserGroupInformation.setConfiguration(conf); loginAsNameNodeUser(conf); - int handlerCount = - conf.getInt(DFS_DATANODE_HANDLER_COUNT_KEY, - DFS_DATANODE_HANDLER_COUNT_DEFAULT); NameNode.initMetrics(conf, this.getRole()); loadNamesystem(conf); - // create rpc server - InetSocketAddress dnSocketAddr = getServiceRpcServerAddress(conf); - if (dnSocketAddr != null) { - int serviceHandlerCount = - conf.getInt(DFS_NAMENODE_SERVICE_HANDLER_COUNT_KEY, - DFS_NAMENODE_SERVICE_HANDLER_COUNT_DEFAULT); - this.serviceRpcServer = RPC.getServer(NamenodeProtocols.class, this, - dnSocketAddr.getHostName(), dnSocketAddr.getPort(), serviceHandlerCount, - false, conf, namesystem.getDelegationTokenSecretManager()); - this.serviceRPCAddress = this.serviceRpcServer.getListenerAddress(); - setRpcServiceServerAddress(conf); - } - this.server = RPC.getServer(NamenodeProtocols.class, this, - socAddr.getHostName(), socAddr.getPort(), - handlerCount, false, conf, - namesystem.getDelegationTokenSecretManager()); - - // set service-level authorization security policy - if (serviceAuthEnabled = - conf.getBoolean( - CommonConfigurationKeys.HADOOP_SECURITY_AUTHORIZATION, false)) { - this.server.refreshServiceAcl(conf, new HDFSPolicyProvider()); - if (this.serviceRpcServer != null) { - this.serviceRpcServer.refreshServiceAcl(conf, new HDFSPolicyProvider()); - } - } - // The rpc-server port can be ephemeral... ensure we have the correct info - this.rpcAddress = this.server.getListenerAddress(); - setRpcServerAddress(conf); + rpcServer = createRpcServer(conf); try { validateConfigurationSettings(conf); @@ -456,13 +335,22 @@ public class NameNode implements Namenod } activate(conf); - LOG.info(getRole() + " up at: " + rpcAddress); - if (serviceRPCAddress != null) { - LOG.info(getRole() + " service server is up at: " + serviceRPCAddress); + LOG.info(getRole() + " up at: " + rpcServer.getRpcAddress()); + if (rpcServer.getServiceRpcAddress() != null) { + LOG.info(getRole() + " service server is up at: " + rpcServer.getServiceRpcAddress()); } } /** + * Create the RPC server implementation. Used as an extension point for the + * BackupNode. + */ + protected NameNodeRpcServer createRpcServer(Configuration conf) + throws IOException { + return new NameNodeRpcServer(conf, this); + } + + /** * Verifies that the final Configuration Settings look ok for the NameNode to * properly start up * Things to check for include: @@ -494,10 +382,7 @@ public class NameNode implements Namenod } namesystem.activate(conf); startHttpServer(conf); - server.start(); //start RPC server - if (serviceRpcServer != null) { - serviceRpcServer.start(); - } + rpcServer.start(); startTrashEmptier(conf); plugins = conf.getInstances(DFS_NAMENODE_PLUGINS_KEY, @@ -577,7 +462,7 @@ public class NameNode implements Namenod */ public void join() { try { - this.server.join(); + this.rpcServer.join(); } catch (InterruptedException ie) { } } @@ -607,8 +492,7 @@ public class NameNode implements Namenod } if(namesystem != null) namesystem.close(); if(emptier != null) emptier.interrupt(); - if(server != null) server.stop(); - if(serviceRpcServer != null) serviceRpcServer.stop(); + if(rpcServer != null) rpcServer.stop(); if (metrics != null) { metrics.shutdown(); } @@ -621,410 +505,6 @@ public class NameNode implements Namenod return stopRequested; } - ///////////////////////////////////////////////////// - // NamenodeProtocol - ///////////////////////////////////////////////////// - @Override // NamenodeProtocol - public BlocksWithLocations getBlocks(DatanodeInfo datanode, long size) - throws IOException { - if(size <= 0) { - throw new IllegalArgumentException( - "Unexpected not positive size: "+size); - } - - return namesystem.getBlockManager().getBlocks(datanode, size); - } - - @Override // NamenodeProtocol - public ExportedBlockKeys getBlockKeys() throws IOException { - return namesystem.getBlockManager().getBlockKeys(); - } - - @Override // NamenodeProtocol - public void errorReport(NamenodeRegistration registration, - int errorCode, - String msg) throws IOException { - verifyRequest(registration); - LOG.info("Error report from " + registration + ": " + msg); - if(errorCode == FATAL) - namesystem.releaseBackupNode(registration); - } - - @Override // NamenodeProtocol - public NamenodeRegistration register(NamenodeRegistration registration) - throws IOException { - verifyVersion(registration.getVersion()); - NamenodeRegistration myRegistration = setRegistration(); - namesystem.registerBackupNode(registration, myRegistration); - return myRegistration; - } - - @Override // NamenodeProtocol - public NamenodeCommand startCheckpoint(NamenodeRegistration registration) - throws IOException { - verifyRequest(registration); - if(!isRole(NamenodeRole.NAMENODE)) - throw new IOException("Only an ACTIVE node can invoke startCheckpoint."); - return namesystem.startCheckpoint(registration, setRegistration()); - } - - @Override // NamenodeProtocol - public void endCheckpoint(NamenodeRegistration registration, - CheckpointSignature sig) throws IOException { - verifyRequest(registration); - if(!isRole(NamenodeRole.NAMENODE)) - throw new IOException("Only an ACTIVE node can invoke endCheckpoint."); - namesystem.endCheckpoint(registration, sig); - } - - @Override // ClientProtocol - public Token<DelegationTokenIdentifier> getDelegationToken(Text renewer) - throws IOException { - return namesystem.getDelegationToken(renewer); - } - - @Override // ClientProtocol - public long renewDelegationToken(Token<DelegationTokenIdentifier> token) - throws InvalidToken, IOException { - return namesystem.renewDelegationToken(token); - } - - @Override // ClientProtocol - public void cancelDelegationToken(Token<DelegationTokenIdentifier> token) - throws IOException { - namesystem.cancelDelegationToken(token); - } - - @Override // ClientProtocol - public LocatedBlocks getBlockLocations(String src, - long offset, - long length) - throws IOException { - metrics.incrGetBlockLocations(); - return namesystem.getBlockLocations(getClientMachine(), - src, offset, length); - } - - @Override // ClientProtocol - public FsServerDefaults getServerDefaults() throws IOException { - return namesystem.getServerDefaults(); - } - - @Override // ClientProtocol - public void create(String src, - FsPermission masked, - String clientName, - EnumSetWritable<CreateFlag> flag, - boolean createParent, - short replication, - long blockSize) throws IOException { - String clientMachine = getClientMachine(); - if (stateChangeLog.isDebugEnabled()) { - stateChangeLog.debug("*DIR* NameNode.create: file " - +src+" for "+clientName+" at "+clientMachine); - } - if (!checkPathLength(src)) { - throw new IOException("create: Pathname too long. Limit " - + MAX_PATH_LENGTH + " characters, " + MAX_PATH_DEPTH + " levels."); - } - namesystem.startFile(src, - new PermissionStatus(UserGroupInformation.getCurrentUser().getShortUserName(), - null, masked), - clientName, clientMachine, flag.get(), createParent, replication, blockSize); - metrics.incrFilesCreated(); - metrics.incrCreateFileOps(); - } - - @Override // ClientProtocol - public LocatedBlock append(String src, String clientName) - throws IOException { - String clientMachine = getClientMachine(); - if (stateChangeLog.isDebugEnabled()) { - stateChangeLog.debug("*DIR* NameNode.append: file " - +src+" for "+clientName+" at "+clientMachine); - } - LocatedBlock info = namesystem.appendFile(src, clientName, clientMachine); - metrics.incrFilesAppended(); - return info; - } - - @Override // ClientProtocol - public boolean recoverLease(String src, String clientName) throws IOException { - String clientMachine = getClientMachine(); - return namesystem.recoverLease(src, clientName, clientMachine); - } - - @Override // ClientProtocol - public boolean setReplication(String src, short replication) - throws IOException { - return namesystem.setReplication(src, replication); - } - - @Override // ClientProtocol - public void setPermission(String src, FsPermission permissions) - throws IOException { - namesystem.setPermission(src, permissions); - } - - @Override // ClientProtocol - public void setOwner(String src, String username, String groupname) - throws IOException { - namesystem.setOwner(src, username, groupname); - } - - @Override // ClientProtocol - public LocatedBlock addBlock(String src, - String clientName, - ExtendedBlock previous, - DatanodeInfo[] excludedNodes) - throws IOException { - if(stateChangeLog.isDebugEnabled()) { - stateChangeLog.debug("*BLOCK* NameNode.addBlock: file " - +src+" for "+clientName); - } - HashMap<Node, Node> excludedNodesSet = null; - if (excludedNodes != null) { - excludedNodesSet = new HashMap<Node, Node>(excludedNodes.length); - for (Node node:excludedNodes) { - excludedNodesSet.put(node, node); - } - } - LocatedBlock locatedBlock = - namesystem.getAdditionalBlock(src, clientName, previous, excludedNodesSet); - if (locatedBlock != null) - metrics.incrAddBlockOps(); - return locatedBlock; - } - - @Override // ClientProtocol - public LocatedBlock getAdditionalDatanode(final String src, final ExtendedBlock blk, - final DatanodeInfo[] existings, final DatanodeInfo[] excludes, - final int numAdditionalNodes, final String clientName - ) throws IOException { - if (LOG.isDebugEnabled()) { - LOG.debug("getAdditionalDatanode: src=" + src - + ", blk=" + blk - + ", existings=" + Arrays.asList(existings) - + ", excludes=" + Arrays.asList(excludes) - + ", numAdditionalNodes=" + numAdditionalNodes - + ", clientName=" + clientName); - } - - metrics.incrGetAdditionalDatanodeOps(); - - HashMap<Node, Node> excludeSet = null; - if (excludes != null) { - excludeSet = new HashMap<Node, Node>(excludes.length); - for (Node node : excludes) { - excludeSet.put(node, node); - } - } - return namesystem.getAdditionalDatanode(src, blk, - existings, excludeSet, numAdditionalNodes, clientName); - } - - /** - * The client needs to give up on the block. - */ - public void abandonBlock(ExtendedBlock b, String src, String holder) - throws IOException { - if(stateChangeLog.isDebugEnabled()) { - stateChangeLog.debug("*BLOCK* NameNode.abandonBlock: " - +b+" of file "+src); - } - if (!namesystem.abandonBlock(b, src, holder)) { - throw new IOException("Cannot abandon block during write to " + src); - } - } - - @Override // ClientProtocol - public boolean complete(String src, String clientName, ExtendedBlock last) - throws IOException { - if(stateChangeLog.isDebugEnabled()) { - stateChangeLog.debug("*DIR* NameNode.complete: " - + src + " for " + clientName); - } - return namesystem.completeFile(src, clientName, last); - } - - /** - * The client has detected an error on the specified located blocks - * and is reporting them to the server. For now, the namenode will - * mark the block as corrupt. In the future we might - * check the blocks are actually corrupt. - */ - @Override - public void reportBadBlocks(LocatedBlock[] blocks) throws IOException { - stateChangeLog.info("*DIR* NameNode.reportBadBlocks"); - for (int i = 0; i < blocks.length; i++) { - ExtendedBlock blk = blocks[i].getBlock(); - DatanodeInfo[] nodes = blocks[i].getLocations(); - for (int j = 0; j < nodes.length; j++) { - DatanodeInfo dn = nodes[j]; - namesystem.getBlockManager().findAndMarkBlockAsCorrupt(blk, dn); - } - } - } - - @Override // ClientProtocol - public LocatedBlock updateBlockForPipeline(ExtendedBlock block, String clientName) - throws IOException { - return namesystem.updateBlockForPipeline(block, clientName); - } - - - @Override // ClientProtocol - public void updatePipeline(String clientName, ExtendedBlock oldBlock, - ExtendedBlock newBlock, DatanodeID[] newNodes) - throws IOException { - namesystem.updatePipeline(clientName, oldBlock, newBlock, newNodes); - } - - @Override // DatanodeProtocol - public void commitBlockSynchronization(ExtendedBlock block, - long newgenerationstamp, long newlength, - boolean closeFile, boolean deleteblock, DatanodeID[] newtargets) - throws IOException { - namesystem.commitBlockSynchronization(block, - newgenerationstamp, newlength, closeFile, deleteblock, newtargets); - } - - @Override // ClientProtocol - public long getPreferredBlockSize(String filename) - throws IOException { - return namesystem.getPreferredBlockSize(filename); - } - - @Deprecated - @Override // ClientProtocol - public boolean rename(String src, String dst) throws IOException { - if(stateChangeLog.isDebugEnabled()) { - stateChangeLog.debug("*DIR* NameNode.rename: " + src + " to " + dst); - } - if (!checkPathLength(dst)) { - throw new IOException("rename: Pathname too long. Limit " - + MAX_PATH_LENGTH + " characters, " + MAX_PATH_DEPTH + " levels."); - } - boolean ret = namesystem.renameTo(src, dst); - if (ret) { - metrics.incrFilesRenamed(); - } - return ret; - } - - @Override // ClientProtocol - public void concat(String trg, String[] src) throws IOException { - namesystem.concat(trg, src); - } - - @Override // ClientProtocol - public void rename(String src, String dst, Options.Rename... options) - throws IOException { - if(stateChangeLog.isDebugEnabled()) { - stateChangeLog.debug("*DIR* NameNode.rename: " + src + " to " + dst); - } - if (!checkPathLength(dst)) { - throw new IOException("rename: Pathname too long. Limit " - + MAX_PATH_LENGTH + " characters, " + MAX_PATH_DEPTH + " levels."); - } - namesystem.renameTo(src, dst, options); - metrics.incrFilesRenamed(); - } - - @Deprecated - @Override // ClientProtocol - public boolean delete(String src) throws IOException { - return delete(src, true); - } - - @Override // ClientProtocol - public boolean delete(String src, boolean recursive) throws IOException { - if (stateChangeLog.isDebugEnabled()) { - stateChangeLog.debug("*DIR* Namenode.delete: src=" + src - + ", recursive=" + recursive); - } - boolean ret = namesystem.delete(src, recursive); - if (ret) - metrics.incrDeleteFileOps(); - return ret; - } - - /** - * Check path length does not exceed maximum. Returns true if - * length and depth are okay. Returns false if length is too long - * or depth is too great. - */ - private boolean checkPathLength(String src) { - Path srcPath = new Path(src); - return (src.length() <= MAX_PATH_LENGTH && - srcPath.depth() <= MAX_PATH_DEPTH); - } - - @Override // ClientProtocol - public boolean mkdirs(String src, FsPermission masked, boolean createParent) - throws IOException { - if(stateChangeLog.isDebugEnabled()) { - stateChangeLog.debug("*DIR* NameNode.mkdirs: " + src); - } - if (!checkPathLength(src)) { - throw new IOException("mkdirs: Pathname too long. Limit " - + MAX_PATH_LENGTH + " characters, " + MAX_PATH_DEPTH + " levels."); - } - return namesystem.mkdirs(src, - new PermissionStatus(UserGroupInformation.getCurrentUser().getShortUserName(), - null, masked), createParent); - } - - @Override // ClientProtocol - public void renewLease(String clientName) throws IOException { - namesystem.renewLease(clientName); - } - - @Override // ClientProtocol - public DirectoryListing getListing(String src, byte[] startAfter, - boolean needLocation) - throws IOException { - DirectoryListing files = namesystem.getListing( - src, startAfter, needLocation); - if (files != null) { - metrics.incrGetListingOps(); - metrics.incrFilesInGetListingOps(files.getPartialListing().length); - } - return files; - } - - @Override // ClientProtocol - public HdfsFileStatus getFileInfo(String src) throws IOException { - metrics.incrFileInfoOps(); - return namesystem.getFileInfo(src, true); - } - - @Override // ClientProtocol - public HdfsFileStatus getFileLinkInfo(String src) throws IOException { - metrics.incrFileInfoOps(); - return namesystem.getFileInfo(src, false); - } - - @Override - public long[] getStats() { - return namesystem.getStats(); - } - - @Override // ClientProtocol - public DatanodeInfo[] getDatanodeReport(DatanodeReportType type) - throws IOException { - DatanodeInfo results[] = namesystem.datanodeReport(type); - if (results == null ) { - throw new IOException("Cannot find datanode report"); - } - return results; - } - - @Override // ClientProtocol - public boolean setSafeMode(SafeModeAction action) throws IOException { - return namesystem.setSafeMode(action); - } - /** * Is the cluster currently in safe mode? */ @@ -1032,257 +512,6 @@ public class NameNode implements Namenod return namesystem.isInSafeMode(); } - @Override // ClientProtocol - public boolean restoreFailedStorage(String arg) - throws AccessControlException { - return namesystem.restoreFailedStorage(arg); - } - - @Override // ClientProtocol - public void saveNamespace() throws IOException { - namesystem.saveNamespace(); - } - - @Override // ClientProtocol - public void refreshNodes() throws IOException { - namesystem.getBlockManager().getDatanodeManager().refreshNodes( - new HdfsConfiguration()); - } - - @Override // NamenodeProtocol - public long getTransactionID() { - return namesystem.getEditLog().getSyncTxId(); - } - - @Override // NamenodeProtocol - public CheckpointSignature rollEditLog() throws IOException { - return namesystem.rollEditLog(); - } - - @Override - public RemoteEditLogManifest getEditLogManifest(long sinceTxId) - throws IOException { - return namesystem.getEditLog().getEditLogManifest(sinceTxId); - } - - @Override // ClientProtocol - public void finalizeUpgrade() throws IOException { - namesystem.finalizeUpgrade(); - } - - @Override // ClientProtocol - public UpgradeStatusReport distributedUpgradeProgress(UpgradeAction action) - throws IOException { - return namesystem.distributedUpgradeProgress(action); - } - - @Override // ClientProtocol - public void metaSave(String filename) throws IOException { - namesystem.metaSave(filename); - } - - @Override // ClientProtocol - public CorruptFileBlocks listCorruptFileBlocks(String path, String cookie) - throws IOException { - Collection<FSNamesystem.CorruptFileBlockInfo> fbs = - namesystem.listCorruptFileBlocks(path, cookie); - - String[] files = new String[fbs.size()]; - String lastCookie = ""; - int i = 0; - for(FSNamesystem.CorruptFileBlockInfo fb: fbs) { - files[i++] = fb.path; - lastCookie = fb.block.getBlockName(); - } - return new CorruptFileBlocks(files, lastCookie); - } - - /** - * Tell all datanodes to use a new, non-persistent bandwidth value for - * dfs.datanode.balance.bandwidthPerSec. - * @param bandwidth Blanacer bandwidth in bytes per second for all datanodes. - * @throws IOException - */ - @Override // ClientProtocol - public void setBalancerBandwidth(long bandwidth) throws IOException { - namesystem.getBlockManager().getDatanodeManager().setBalancerBandwidth(bandwidth); - } - - @Override // ClientProtocol - public ContentSummary getContentSummary(String path) throws IOException { - return namesystem.getContentSummary(path); - } - - @Override // ClientProtocol - public void setQuota(String path, long namespaceQuota, long diskspaceQuota) - throws IOException { - namesystem.setQuota(path, namespaceQuota, diskspaceQuota); - } - - @Override // ClientProtocol - public void fsync(String src, String clientName) throws IOException { - namesystem.fsync(src, clientName); - } - - @Override // ClientProtocol - public void setTimes(String src, long mtime, long atime) - throws IOException { - namesystem.setTimes(src, mtime, atime); - } - - @Override // ClientProtocol - public void createSymlink(String target, String link, FsPermission dirPerms, - boolean createParent) throws IOException { - metrics.incrCreateSymlinkOps(); - /* We enforce the MAX_PATH_LENGTH limit even though a symlink target - * URI may refer to a non-HDFS file system. - */ - if (!checkPathLength(link)) { - throw new IOException("Symlink path exceeds " + MAX_PATH_LENGTH + - " character limit"); - - } - if ("".equals(target)) { - throw new IOException("Invalid symlink target"); - } - final UserGroupInformation ugi = UserGroupInformation.getCurrentUser(); - namesystem.createSymlink(target, link, - new PermissionStatus(ugi.getShortUserName(), null, dirPerms), createParent); - } - - @Override // ClientProtocol - public String getLinkTarget(String path) throws IOException { - metrics.incrGetLinkTargetOps(); - /* Resolves the first symlink in the given path, returning a - * new path consisting of the target of the symlink and any - * remaining path components from the original path. - */ - try { - HdfsFileStatus stat = namesystem.getFileInfo(path, false); - if (stat != null) { - // NB: getSymlink throws IOException if !stat.isSymlink() - return stat.getSymlink(); - } - } catch (UnresolvedPathException e) { - return e.getResolvedPath().toString(); - } catch (UnresolvedLinkException e) { - // The NameNode should only throw an UnresolvedPathException - throw new AssertionError("UnresolvedLinkException thrown"); - } - return null; - } - - - @Override // DatanodeProtocol - public DatanodeRegistration registerDatanode(DatanodeRegistration nodeReg) - throws IOException { - verifyVersion(nodeReg.getVersion()); - namesystem.registerDatanode(nodeReg); - - return nodeReg; - } - - @Override // DatanodeProtocol - public DatanodeCommand[] sendHeartbeat(DatanodeRegistration nodeReg, - long capacity, long dfsUsed, long remaining, long blockPoolUsed, - int xmitsInProgress, int xceiverCount, int failedVolumes) - throws IOException { - verifyRequest(nodeReg); - return namesystem.handleHeartbeat(nodeReg, capacity, dfsUsed, remaining, - blockPoolUsed, xceiverCount, xmitsInProgress, failedVolumes); - } - - @Override // DatanodeProtocol - public DatanodeCommand blockReport(DatanodeRegistration nodeReg, - String poolId, long[] blocks) throws IOException { - verifyRequest(nodeReg); - BlockListAsLongs blist = new BlockListAsLongs(blocks); - if(stateChangeLog.isDebugEnabled()) { - stateChangeLog.debug("*BLOCK* NameNode.blockReport: " - + "from " + nodeReg.getName() + " " + blist.getNumberOfBlocks() - + " blocks"); - } - - namesystem.getBlockManager().processReport(nodeReg, poolId, blist); - if (getFSImage().isUpgradeFinalized()) - return new DatanodeCommand.Finalize(poolId); - return null; - } - - @Override // DatanodeProtocol - public void blockReceivedAndDeleted(DatanodeRegistration nodeReg, String poolId, - ReceivedDeletedBlockInfo[] receivedAndDeletedBlocks) throws IOException { - verifyRequest(nodeReg); - if(stateChangeLog.isDebugEnabled()) { - stateChangeLog.debug("*BLOCK* NameNode.blockReceivedAndDeleted: " - +"from "+nodeReg.getName()+" "+receivedAndDeletedBlocks.length - +" blocks."); - } - namesystem.getBlockManager().blockReceivedAndDeleted( - nodeReg, poolId, receivedAndDeletedBlocks); - } - - @Override // DatanodeProtocol - public void errorReport(DatanodeRegistration nodeReg, - int errorCode, String msg) throws IOException { - String dnName = (nodeReg == null ? "unknown DataNode" : nodeReg.getName()); - - if (errorCode == DatanodeProtocol.NOTIFY) { - LOG.info("Error report from " + dnName + ": " + msg); - return; - } - verifyRequest(nodeReg); - - if (errorCode == DatanodeProtocol.DISK_ERROR) { - LOG.warn("Disk error on " + dnName + ": " + msg); - } else if (errorCode == DatanodeProtocol.FATAL_DISK_ERROR) { - LOG.warn("Fatal disk error on " + dnName + ": " + msg); - namesystem.getBlockManager().getDatanodeManager().removeDatanode(nodeReg); - } else { - LOG.info("Error report from " + dnName + ": " + msg); - } - } - - @Override // DatanodeProtocol, NamenodeProtocol - public NamespaceInfo versionRequest() throws IOException { - return namesystem.getNamespaceInfo(); - } - - @Override // DatanodeProtocol - public UpgradeCommand processUpgradeCommand(UpgradeCommand comm) throws IOException { - return namesystem.processDistributedUpgradeCommand(comm); - } - - /** - * Verify request. - * - * Verifies correctness of the datanode version, registration ID, and - * if the datanode does not need to be shutdown. - * - * @param nodeReg data node registration - * @throws IOException - */ - void verifyRequest(NodeRegistration nodeReg) throws IOException { - verifyVersion(nodeReg.getVersion()); - if (!namesystem.getRegistrationID().equals(nodeReg.getRegistrationID())) { - LOG.warn("Invalid registrationID - expected: " - + namesystem.getRegistrationID() + " received: " - + nodeReg.getRegistrationID()); - throw new UnregisteredNodeException(nodeReg); - } - } - - /** - * Verify version. - * - * @param version - * @throws IOException - */ - void verifyVersion(int version) throws IOException { - if (version != HdfsConstants.LAYOUT_VERSION) - throw new IncorrectVersionException(version, "data node"); - } - /** get FSImage */ FSImage getFSImage() { return namesystem.dir.fsImage; @@ -1293,7 +522,7 @@ public class NameNode implements Namenod * @return namenode rpc address */ public InetSocketAddress getNameNodeAddress() { - return rpcAddress; + return rpcServer.getRpcAddress(); } /** @@ -1302,7 +531,7 @@ public class NameNode implements Namenod * @return namenode service rpc address used by datanodes */ public InetSocketAddress getServiceRpcAddress() { - return serviceRPCAddress != null ? serviceRPCAddress : rpcAddress; + return rpcServer.getServiceRpcAddress() != null ? rpcServer.getServiceRpcAddress() : rpcServer.getRpcAddress(); } /** @@ -1387,40 +616,6 @@ public class NameNode implements Namenod return false; } - @Override // RefreshAuthorizationPolicyProtocol - public void refreshServiceAcl() throws IOException { - if (!serviceAuthEnabled) { - throw new AuthorizationException("Service Level Authorization not enabled!"); - } - - this.server.refreshServiceAcl(new Configuration(), new HDFSPolicyProvider()); - if (this.serviceRpcServer != null) { - this.serviceRpcServer.refreshServiceAcl(new Configuration(), new HDFSPolicyProvider()); - } - } - - @Override // RefreshAuthorizationPolicyProtocol - public void refreshUserToGroupsMappings() throws IOException { - LOG.info("Refreshing all user-to-groups mappings. Requested by user: " + - UserGroupInformation.getCurrentUser().getShortUserName()); - Groups.getUserToGroupsMappingService().refresh(); - } - - @Override // RefreshAuthorizationPolicyProtocol - public void refreshSuperUserGroupsConfiguration() { - LOG.info("Refreshing SuperUser proxy group mapping list "); - - ProxyUsers.refreshSuperUserGroupsConfiguration(); - } - - @Override // GetUserMappingsProtocol - public String[] getGroupsForUser(String user) throws IOException { - if (LOG.isDebugEnabled()) { - LOG.debug("Getting groups for user " + user); - } - return UserGroupInformation.createRemoteUser(user).getGroupNames(); - } - private static void printUsage() { System.err.println( "Usage: java NameNode [" + @@ -1593,11 +788,4 @@ public class NameNode implements Namenod } } - private static String getClientMachine() { - String clientMachine = Server.getRemoteAddress(); - if (clientMachine == null) { - clientMachine = ""; - } - return clientMachine; - } }