Author: todd
Date: Tue Sep  6 00:41:47 2011
New Revision: 1165463

URL: http://svn.apache.org/viewvc?rev=1165463&view=rev
Log:
HDFS-2197. Refactor RPC call implementations out of NameNode class. Contributed 
by Todd Lipcon.

Added:
    
hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/NameNodeRpcServer.java
Modified:
    hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt
    
hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/BackupNode.java
    
hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/CancelDelegationTokenServlet.java
    
hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/DfsServlet.java
    
hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/GetDelegationTokenServlet.java
    
hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/NameNode.java
    
hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/NamenodeFsck.java
    
hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/NamenodeJspHelper.java
    
hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/RenewDelegationTokenServlet.java
    
hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/MiniDFSCluster.java
    
hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestClientProtocolForPipelineRecovery.java
    
hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestDFSClientRetries.java
    
hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestDecommission.java
    
hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestFileAppend4.java
    
hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestFileCreation.java
    
hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestLeaseRecovery.java
    
hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestPipelines.java
    
hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/UpgradeUtilities.java
    
hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/security/token/block/TestBlockToken.java
    
hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/balancer/TestBalancerWithMultipleNameNodes.java
    
hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/TestBlockTokenWithDFS.java
    
hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestBlockReport.java
    
hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestDataNodeVolumeFailure.java
    
hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestTransferRbw.java
    
hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/NNThroughputBenchmark.java
    
hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/NameNodeAdapter.java
    
hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/OfflineEditsViewerHelper.java
    
hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestBackupNode.java
    
hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestBlockUnderConstruction.java
    
hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestCheckpoint.java
    
hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestDeadDatanode.java
    
hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestFsck.java
    
hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestHDFSConcat.java
    
hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestListCorruptFileBlocks.java
    
hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestNNStorageRetentionFunctional.java
    
hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestNameNodeJspHelper.java
    
hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestParallelImageWrite.java
    
hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestStartup.java
    
hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestStorageRestore.java
    
hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/metrics/TestNNMetricFilesInGetListingOps.java
    
hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/tools/offlineImageViewer/TestOfflineImageViewer.java

Modified: hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt
URL: 
http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt?rev=1165463&r1=1165462&r2=1165463&view=diff
==============================================================================
--- hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt (original)
+++ hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt Tue Sep  6 
00:41:47 2011
@@ -11,6 +11,7 @@ Trunk (unreleased changes)
 
     HDFS-1620. Rename HdfsConstants -> HdfsServerConstants, FSConstants ->
                HdfsConstants. (Harsh J Chouraria via atm)
+    HDFS-2197. Refactor RPC call implementations out of NameNode class (todd)
 
   BUG FIXES
     HDFS-2287. TestParallelRead has a small off-by-one bug. (todd)

Modified: 
hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/BackupNode.java
URL: 
http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/BackupNode.java?rev=1165463&r1=1165462&r2=1165463&view=diff
==============================================================================
--- 
hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/BackupNode.java
 (original)
+++ 
hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/BackupNode.java
 Tue Sep  6 00:41:47 2011
@@ -52,7 +52,7 @@ import org.apache.hadoop.net.NetUtils;
  * </ol>
  */
 @InterfaceAudience.Private
-public class BackupNode extends NameNode implements JournalProtocol {
+public class BackupNode extends NameNode {
   private static final String BN_ADDRESS_NAME_KEY = 
DFSConfigKeys.DFS_NAMENODE_BACKUP_ADDRESS_KEY;
   private static final String BN_ADDRESS_DEFAULT = 
DFSConfigKeys.DFS_NAMENODE_BACKUP_ADDRESS_DEFAULT;
   private static final String BN_HTTP_ADDRESS_NAME_KEY = 
DFSConfigKeys.DFS_NAMENODE_BACKUP_HTTP_ADDRESS_KEY;
@@ -74,7 +74,6 @@ public class BackupNode extends NameNode
   
   BackupNode(Configuration conf, NamenodeRole role) throws IOException {
     super(conf, role);
-    this.server.addProtocol(JournalProtocol.class, this);
   }
 
   /////////////////////////////////////////////////////
@@ -96,18 +95,20 @@ public class BackupNode extends NameNode
   }
 
   @Override // NameNode
-  protected void setRpcServerAddress(Configuration conf) {
-    conf.set(BN_ADDRESS_NAME_KEY, getHostPortString(rpcAddress));
+  protected void setRpcServerAddress(Configuration conf,
+      InetSocketAddress addr) {
+    conf.set(BN_ADDRESS_NAME_KEY, getHostPortString(addr));
   }
   
   @Override // Namenode
-  protected void setRpcServiceServerAddress(Configuration conf) {
-    conf.set(BN_SERVICE_RPC_ADDRESS_KEY, getHostPortString(serviceRPCAddress));
+  protected void setRpcServiceServerAddress(Configuration conf,
+      InetSocketAddress addr) {
+    conf.set(BN_SERVICE_RPC_ADDRESS_KEY,  getHostPortString(addr));
   }
 
   @Override // NameNode
   protected InetSocketAddress getHttpServerAddress(Configuration conf) {
-    assert rpcAddress != null : "rpcAddress should be calculated first";
+    assert getNameNodeAddress() != null : "rpcAddress should be calculated 
first";
     String addr = conf.get(BN_HTTP_ADDRESS_NAME_KEY, BN_HTTP_ADDRESS_DEFAULT);
     return NetUtils.createSocketAddr(addr);
   }
@@ -146,6 +147,12 @@ public class BackupNode extends NameNode
     runCheckpointDaemon(conf);
   }
 
+  @Override
+  protected NameNodeRpcServer createRpcServer(Configuration conf)
+      throws IOException {
+    return new BackupNodeRpcServer(conf, this);
+  }
+
   @Override // NameNode
   public void stop() {
     if(checkpointManager != null) {
@@ -178,74 +185,84 @@ public class BackupNode extends NameNode
     super.stop();
   }
 
-  
-  @Override
-  public long getProtocolVersion(String protocol, long clientVersion)
-      throws IOException {
-    if (protocol.equals(JournalProtocol.class.getName())) {
-      return JournalProtocol.versionID;
-    } else {
-      return super.getProtocolVersion(protocol, clientVersion);
+  static class BackupNodeRpcServer extends NameNodeRpcServer implements 
JournalProtocol {
+    private final String nnRpcAddress;
+    
+    private BackupNodeRpcServer(Configuration conf, BackupNode nn)
+        throws IOException {
+      super(conf, nn);
+      this.server.addProtocol(JournalProtocol.class, this);
+      nnRpcAddress = nn.nnRpcAddress;
     }
-  }
 
-  /////////////////////////////////////////////////////
-  // NamenodeProtocol implementation for backup node.
-  /////////////////////////////////////////////////////
-  @Override // NamenodeProtocol
-  public BlocksWithLocations getBlocks(DatanodeInfo datanode, long size)
-  throws IOException {
-    throw new UnsupportedActionException("getBlocks");
-  }
-
-  // Only active name-node can register other nodes.
-  @Override // NamenodeProtocol
-  public NamenodeRegistration register(NamenodeRegistration registration
-  ) throws IOException {
-    throw new UnsupportedActionException("register");
-  }
-
-  @Override // NamenodeProtocol
-  public NamenodeCommand startCheckpoint(NamenodeRegistration registration)
-  throws IOException {
-    throw new UnsupportedActionException("startCheckpoint");
-  }
-
-  @Override // NamenodeProtocol
-  public void endCheckpoint(NamenodeRegistration registration,
-                            CheckpointSignature sig) throws IOException {
-    throw new UnsupportedActionException("endCheckpoint");
-  }  
-
-  /////////////////////////////////////////////////////
-  // BackupNodeProtocol implementation for backup node.
-  /////////////////////////////////////////////////////
-
-  @Override
-  public void journal(NamenodeRegistration nnReg,
-      long firstTxId, int numTxns,
-      byte[] records) throws IOException {
-    verifyRequest(nnReg);
-    if(!nnRpcAddress.equals(nnReg.getAddress()))
-      throw new IOException("Journal request from unexpected name-node: "
-          + nnReg.getAddress() + " expecting " + nnRpcAddress);
-    getBNImage().journal(firstTxId, numTxns, records);
-  }
-
-  @Override
-  public void startLogSegment(NamenodeRegistration registration, long txid)
-      throws IOException {
-    verifyRequest(registration);
+    @Override
+    public long getProtocolVersion(String protocol, long clientVersion)
+        throws IOException {
+      if (protocol.equals(JournalProtocol.class.getName())) {
+        return JournalProtocol.versionID;
+      } else {
+        return super.getProtocolVersion(protocol, clientVersion);
+      }
+    }
   
-    getBNImage().namenodeStartedLogSegment(txid);
-  }
-
-  //////////////////////////////////////////////////////
+    /////////////////////////////////////////////////////
+    // NamenodeProtocol implementation for backup node.
+    /////////////////////////////////////////////////////
+    @Override // NamenodeProtocol
+    public BlocksWithLocations getBlocks(DatanodeInfo datanode, long size)
+    throws IOException {
+      throw new UnsupportedActionException("getBlocks");
+    }
+  
+    // Only active name-node can register other nodes.
+    @Override // NamenodeProtocol
+    public NamenodeRegistration register(NamenodeRegistration registration
+    ) throws IOException {
+      throw new UnsupportedActionException("register");
+    }
+  
+    @Override // NamenodeProtocol
+    public NamenodeCommand startCheckpoint(NamenodeRegistration registration)
+    throws IOException {
+      throw new UnsupportedActionException("startCheckpoint");
+    }
   
+    @Override // NamenodeProtocol
+    public void endCheckpoint(NamenodeRegistration registration,
+                              CheckpointSignature sig) throws IOException {
+      throw new UnsupportedActionException("endCheckpoint");
+    }  
   
-  BackupImage getBNImage() {
-    return (BackupImage)getFSImage();
+    /////////////////////////////////////////////////////
+    // BackupNodeProtocol implementation for backup node.
+    /////////////////////////////////////////////////////
+  
+    @Override
+    public void journal(NamenodeRegistration nnReg,
+        long firstTxId, int numTxns,
+        byte[] records) throws IOException {
+      verifyRequest(nnReg);
+      if(!nnRpcAddress.equals(nnReg.getAddress()))
+        throw new IOException("Journal request from unexpected name-node: "
+            + nnReg.getAddress() + " expecting " + rpcAddress);
+      getBNImage().journal(firstTxId, numTxns, records);
+    }
+  
+    @Override
+    public void startLogSegment(NamenodeRegistration registration, long txid)
+        throws IOException {
+      verifyRequest(registration);
+    
+      getBNImage().namenodeStartedLogSegment(txid);
+    }
+    
+    private BackupImage getBNImage() {
+      return (BackupImage)nn.getFSImage();
+    }
   }
+  
+  //////////////////////////////////////////////////////
+  
 
   boolean shouldCheckpointAtStartup() {
     FSImage fsImage = getFSImage();

Modified: 
hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/CancelDelegationTokenServlet.java
URL: 
http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/CancelDelegationTokenServlet.java?rev=1165463&r1=1165462&r2=1165463&view=diff
==============================================================================
--- 
hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/CancelDelegationTokenServlet.java
 (original)
+++ 
hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/CancelDelegationTokenServlet.java
 Tue Sep  6 00:41:47 2011
@@ -69,7 +69,7 @@ public class CancelDelegationTokenServle
     try {
       ugi.doAs(new PrivilegedExceptionAction<Void>() {
         public Void run() throws Exception {
-          nn.cancelDelegationToken(token);
+          nn.getRpcServer().cancelDelegationToken(token);
           return null;
         }
       });

Modified: 
hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/DfsServlet.java
URL: 
http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/DfsServlet.java?rev=1165463&r1=1165462&r2=1165463&view=diff
==============================================================================
--- 
hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/DfsServlet.java
 (original)
+++ 
hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/DfsServlet.java
 Tue Sep  6 00:41:47 2011
@@ -73,7 +73,7 @@ abstract class DfsServlet extends HttpSe
     // rpc
     NameNode nn = NameNodeHttpServer.getNameNodeFromContext(context);
     if (nn != null) {
-      return nn;
+      return nn.getRpcServer();
     }
     InetSocketAddress nnAddr =
       NameNodeHttpServer.getNameNodeAddressFromContext(context);

Modified: 
hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/GetDelegationTokenServlet.java
URL: 
http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/GetDelegationTokenServlet.java?rev=1165463&r1=1165462&r2=1165463&view=diff
==============================================================================
--- 
hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/GetDelegationTokenServlet.java
 (original)
+++ 
hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/GetDelegationTokenServlet.java
 Tue Sep  6 00:41:47 2011
@@ -75,7 +75,7 @@ public class GetDelegationTokenServlet e
           + ":" + NameNode.getAddress(conf).getPort();
 
           Token<DelegationTokenIdentifier> token = 
-            nn.getDelegationToken(new Text(renewerFinal));
+            nn.getRpcServer().getDelegationToken(new Text(renewerFinal));
           if(token == null) {
             throw new Exception("couldn't get the token for " +s);
           }

Modified: 
hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/NameNode.java
URL: 
http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/NameNode.java?rev=1165463&r1=1165462&r2=1165463&view=diff
==============================================================================
--- 
hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/NameNode.java
 (original)
+++ 
hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/NameNode.java
 Tue Sep  6 00:41:47 2011
@@ -17,13 +17,14 @@
  */
 package org.apache.hadoop.hdfs.server.namenode;
 
+import static 
org.apache.hadoop.fs.CommonConfigurationKeysPublic.FS_DEFAULT_NAME_KEY;
+import static org.apache.hadoop.hdfs.DFSConfigKeys.*;
+
 import java.io.File;
 import java.io.IOException;
 import java.net.InetSocketAddress;
 import java.net.URI;
-import java.util.Arrays;
 import java.util.Collection;
-import java.util.HashMap;
 import java.util.Iterator;
 import java.util.List;
 
@@ -32,79 +33,21 @@ import org.apache.commons.logging.LogFac
 import org.apache.hadoop.classification.InterfaceAudience;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.CommonConfigurationKeys;
-import org.apache.hadoop.fs.ContentSummary;
-import org.apache.hadoop.fs.CreateFlag;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.FileUtil;
-import org.apache.hadoop.fs.FsServerDefaults;
-import org.apache.hadoop.fs.Options;
-import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.fs.Trash;
-import org.apache.hadoop.fs.UnresolvedLinkException;
-import org.apache.hadoop.fs.permission.FsPermission;
-import org.apache.hadoop.fs.permission.PermissionStatus;
-import static org.apache.hadoop.hdfs.DFSConfigKeys.*;
 import org.apache.hadoop.hdfs.DFSUtil;
-import org.apache.hadoop.hdfs.HDFSPolicyProvider;
 import org.apache.hadoop.hdfs.HdfsConfiguration;
-import org.apache.hadoop.hdfs.protocol.Block;
-import org.apache.hadoop.hdfs.protocol.BlockListAsLongs;
-import org.apache.hadoop.hdfs.protocol.ClientProtocol;
-import org.apache.hadoop.hdfs.protocol.CorruptFileBlocks;
-import org.apache.hadoop.hdfs.protocol.DatanodeID;
-import org.apache.hadoop.hdfs.protocol.DatanodeInfo;
-import org.apache.hadoop.hdfs.protocol.DirectoryListing;
-import org.apache.hadoop.hdfs.protocol.ExtendedBlock;
 import org.apache.hadoop.hdfs.protocol.HdfsConstants;
-import org.apache.hadoop.hdfs.protocol.HdfsConstants.DatanodeReportType;
-import org.apache.hadoop.hdfs.protocol.HdfsConstants.SafeModeAction;
-import org.apache.hadoop.hdfs.protocol.HdfsConstants.UpgradeAction;
-import static org.apache.hadoop.hdfs.protocol.HdfsConstants.MAX_PATH_LENGTH;
-import static org.apache.hadoop.hdfs.protocol.HdfsConstants.MAX_PATH_DEPTH;
-import org.apache.hadoop.hdfs.protocol.HdfsFileStatus;
-import org.apache.hadoop.hdfs.protocol.LocatedBlock;
-import org.apache.hadoop.hdfs.protocol.LocatedBlocks;
-import org.apache.hadoop.hdfs.protocol.UnregisteredNodeException;
-import org.apache.hadoop.hdfs.protocol.UnresolvedPathException;
-import org.apache.hadoop.hdfs.security.token.block.ExportedBlockKeys;
-import 
org.apache.hadoop.hdfs.security.token.delegation.DelegationTokenIdentifier;
 import org.apache.hadoop.hdfs.server.common.HdfsServerConstants.NamenodeRole;
 import org.apache.hadoop.hdfs.server.common.HdfsServerConstants.StartupOption;
-import org.apache.hadoop.hdfs.server.common.IncorrectVersionException;
-import org.apache.hadoop.hdfs.server.common.UpgradeStatusReport;
 import org.apache.hadoop.hdfs.server.namenode.metrics.NameNodeMetrics;
-import org.apache.hadoop.hdfs.server.protocol.BlocksWithLocations;
-import org.apache.hadoop.hdfs.server.protocol.DatanodeCommand;
-import org.apache.hadoop.hdfs.server.protocol.DatanodeProtocol;
-import org.apache.hadoop.hdfs.server.protocol.DatanodeRegistration;
-import org.apache.hadoop.hdfs.server.protocol.NamenodeCommand;
-import org.apache.hadoop.hdfs.server.protocol.NamenodeProtocol;
 import org.apache.hadoop.hdfs.server.protocol.NamenodeProtocols;
 import org.apache.hadoop.hdfs.server.protocol.NamenodeRegistration;
-import org.apache.hadoop.hdfs.server.protocol.NamespaceInfo;
-import org.apache.hadoop.hdfs.server.protocol.NodeRegistration;
-import org.apache.hadoop.hdfs.server.protocol.ReceivedDeletedBlockInfo;
-import org.apache.hadoop.hdfs.server.protocol.RemoteEditLogManifest;
-import org.apache.hadoop.hdfs.server.protocol.UpgradeCommand;
-import org.apache.hadoop.io.EnumSetWritable;
-import org.apache.hadoop.io.Text;
-import org.apache.hadoop.ipc.ProtocolSignature;
-import org.apache.hadoop.ipc.RPC;
-import org.apache.hadoop.ipc.Server;
 import org.apache.hadoop.metrics2.lib.DefaultMetricsSystem;
 import org.apache.hadoop.net.NetUtils;
-import org.apache.hadoop.net.Node;
-import org.apache.hadoop.security.AccessControlException;
-import org.apache.hadoop.security.Groups;
-import org.apache.hadoop.security.RefreshUserMappingsProtocol;
 import org.apache.hadoop.security.SecurityUtil;
 import org.apache.hadoop.security.UserGroupInformation;
-import org.apache.hadoop.security.authorize.AuthorizationException;
-import org.apache.hadoop.security.authorize.ProxyUsers;
-import org.apache.hadoop.security.authorize.RefreshAuthorizationPolicyProtocol;
-import org.apache.hadoop.security.token.SecretManager.InvalidToken;
-import org.apache.hadoop.security.token.Token;
-import org.apache.hadoop.tools.GetUserMappingsProtocol;
 import org.apache.hadoop.util.ServicePlugin;
 import org.apache.hadoop.util.StringUtils;
 
@@ -146,7 +89,7 @@ import org.apache.hadoop.util.StringUtil
  * NameNode state, for example partial blocksMap etc.
  **********************************************************/
 @InterfaceAudience.Private
-public class NameNode implements NamenodeProtocols {
+public class NameNode {
   static{
     HdfsConfiguration.init();
   }
@@ -179,32 +122,6 @@ public class NameNode implements Namenod
     DFS_NAMENODE_BACKUP_SERVICE_RPC_ADDRESS_KEY
   };
   
-  public long getProtocolVersion(String protocol, 
-                                 long clientVersion) throws IOException {
-    if (protocol.equals(ClientProtocol.class.getName())) {
-      return ClientProtocol.versionID; 
-    } else if (protocol.equals(DatanodeProtocol.class.getName())){
-      return DatanodeProtocol.versionID;
-    } else if (protocol.equals(NamenodeProtocol.class.getName())){
-      return NamenodeProtocol.versionID;
-    } else if 
(protocol.equals(RefreshAuthorizationPolicyProtocol.class.getName())){
-      return RefreshAuthorizationPolicyProtocol.versionID;
-    } else if (protocol.equals(RefreshUserMappingsProtocol.class.getName())){
-      return RefreshUserMappingsProtocol.versionID;
-    } else if (protocol.equals(GetUserMappingsProtocol.class.getName())){
-      return GetUserMappingsProtocol.versionID;
-    } else {
-      throw new IOException("Unknown protocol to name node: " + protocol);
-    }
-  }
-    
-
-  @Override // VersionedProtocol
-  public ProtocolSignature getProtocolSignature(String protocol,
-      long clientVersion, int clientMethodsHash) throws IOException {
-    return ProtocolSignature.getProtocolSignature(
-        this, protocol, clientVersion, clientMethodsHash);
-  }
 
   public static final int DEFAULT_PORT = 8020;
 
@@ -213,18 +130,6 @@ public class NameNode implements Namenod
   
   protected FSNamesystem namesystem; 
   protected NamenodeRole role;
-  /** RPC server. Package-protected for use in tests. */
-  RPC.Server server;
-  /** RPC server for HDFS Services communication.
-      BackupNode, Datanodes and all other services
-      should be connecting to this server if it is
-      configured. Clients should only go to NameNode#server
-  */
-  protected Server serviceRpcServer;
-  /** RPC server address */
-  protected InetSocketAddress rpcAddress = null;
-  /** RPC server for DN address */
-  protected InetSocketAddress serviceRPCAddress = null;
   /** httpServer */
   protected NameNodeHttpServer httpServer;
   private Thread emptier;
@@ -232,11 +137,11 @@ public class NameNode implements Namenod
   protected boolean stopRequested = false;
   /** Registration information of this name-node  */
   protected NamenodeRegistration nodeRegistration;
-  /** Is service level authorization enabled? */
-  private boolean serviceAuthEnabled = false;
   /** Activated plug-ins. */
   private List<ServicePlugin> plugins;
   
+  private NameNodeRpcServer rpcServer;
+  
   /** Format a new filesystem.  Destroys any filesystem that may already
    * exist at this location.  **/
   public static void format(Configuration conf) throws IOException {
@@ -252,6 +157,10 @@ public class NameNode implements Namenod
     return namesystem;
   }
 
+  public NamenodeProtocols getRpcServer() {
+    return rpcServer;
+  }
+  
   static void initMetrics(Configuration conf, NamenodeRole role) {
     metrics = NameNodeMetrics.create(conf, role);
   }
@@ -359,11 +268,13 @@ public class NameNode implements Namenod
   /**
    * Modifies the configuration passed to contain the service rpc address 
setting
    */
-  protected void setRpcServiceServerAddress(Configuration conf) {
+  protected void setRpcServiceServerAddress(Configuration conf,
+      InetSocketAddress serviceRPCAddress) {
     setServiceAddress(conf, getHostPortString(serviceRPCAddress));
   }
 
-  protected void setRpcServerAddress(Configuration conf) {
+  protected void setRpcServerAddress(Configuration conf,
+      InetSocketAddress rpcAddress) {
     FileSystem.setDefaultUri(conf, getUri(rpcAddress));
   }
 
@@ -387,7 +298,7 @@ public class NameNode implements Namenod
 
   NamenodeRegistration setRegistration() {
     nodeRegistration = new NamenodeRegistration(
-        getHostPortString(rpcAddress),
+        getHostPortString(rpcServer.getRpcAddress()),
         getHostPortString(getHttpAddress()),
         getFSImage().getStorage(), getRole());
     return nodeRegistration;
@@ -408,45 +319,13 @@ public class NameNode implements Namenod
    * @param conf the configuration
    */
   protected void initialize(Configuration conf) throws IOException {
-    InetSocketAddress socAddr = getRpcServerAddress(conf);
     UserGroupInformation.setConfiguration(conf);
     loginAsNameNodeUser(conf);
-    int handlerCount = 
-      conf.getInt(DFS_DATANODE_HANDLER_COUNT_KEY, 
-                  DFS_DATANODE_HANDLER_COUNT_DEFAULT);
 
     NameNode.initMetrics(conf, this.getRole());
     loadNamesystem(conf);
-    // create rpc server
-    InetSocketAddress dnSocketAddr = getServiceRpcServerAddress(conf);
-    if (dnSocketAddr != null) {
-      int serviceHandlerCount =
-        conf.getInt(DFS_NAMENODE_SERVICE_HANDLER_COUNT_KEY,
-                    DFS_NAMENODE_SERVICE_HANDLER_COUNT_DEFAULT);
-      this.serviceRpcServer = RPC.getServer(NamenodeProtocols.class, this,
-          dnSocketAddr.getHostName(), dnSocketAddr.getPort(), 
serviceHandlerCount,
-          false, conf, namesystem.getDelegationTokenSecretManager());
-      this.serviceRPCAddress = this.serviceRpcServer.getListenerAddress();
-      setRpcServiceServerAddress(conf);
-    }
-    this.server = RPC.getServer(NamenodeProtocols.class, this,
-                                socAddr.getHostName(), socAddr.getPort(),
-                                handlerCount, false, conf, 
-                                namesystem.getDelegationTokenSecretManager());
-
-    // set service-level authorization security policy
-    if (serviceAuthEnabled =
-          conf.getBoolean(
-            CommonConfigurationKeys.HADOOP_SECURITY_AUTHORIZATION, false)) {
-      this.server.refreshServiceAcl(conf, new HDFSPolicyProvider());
-      if (this.serviceRpcServer != null) {
-        this.serviceRpcServer.refreshServiceAcl(conf, new 
HDFSPolicyProvider());
-      }
-    }
 
-    // The rpc-server port can be ephemeral... ensure we have the correct info
-    this.rpcAddress = this.server.getListenerAddress(); 
-    setRpcServerAddress(conf);
+    rpcServer = createRpcServer(conf);
     
     try {
       validateConfigurationSettings(conf);
@@ -456,13 +335,22 @@ public class NameNode implements Namenod
     }
 
     activate(conf);
-    LOG.info(getRole() + " up at: " + rpcAddress);
-    if (serviceRPCAddress != null) {
-      LOG.info(getRole() + " service server is up at: " + serviceRPCAddress); 
+    LOG.info(getRole() + " up at: " + rpcServer.getRpcAddress());
+    if (rpcServer.getServiceRpcAddress() != null) {
+      LOG.info(getRole() + " service server is up at: " + 
rpcServer.getServiceRpcAddress()); 
     }
   }
   
   /**
+   * Create the RPC server implementation. Used as an extension point for the
+   * BackupNode.
+   */
+  protected NameNodeRpcServer createRpcServer(Configuration conf)
+      throws IOException {
+    return new NameNodeRpcServer(conf, this);
+  }
+
+  /**
    * Verifies that the final Configuration Settings look ok for the NameNode to
    * properly start up
    * Things to check for include:
@@ -494,10 +382,7 @@ public class NameNode implements Namenod
     }
     namesystem.activate(conf);
     startHttpServer(conf);
-    server.start();  //start RPC server
-    if (serviceRpcServer != null) {
-      serviceRpcServer.start();      
-    }
+    rpcServer.start();
     startTrashEmptier(conf);
     
     plugins = conf.getInstances(DFS_NAMENODE_PLUGINS_KEY,
@@ -577,7 +462,7 @@ public class NameNode implements Namenod
    */
   public void join() {
     try {
-      this.server.join();
+      this.rpcServer.join();
     } catch (InterruptedException ie) {
     }
   }
@@ -607,8 +492,7 @@ public class NameNode implements Namenod
     }
     if(namesystem != null) namesystem.close();
     if(emptier != null) emptier.interrupt();
-    if(server != null) server.stop();
-    if(serviceRpcServer != null) serviceRpcServer.stop();
+    if(rpcServer != null) rpcServer.stop();
     if (metrics != null) {
       metrics.shutdown();
     }
@@ -621,410 +505,6 @@ public class NameNode implements Namenod
     return stopRequested;
   }
 
-  /////////////////////////////////////////////////////
-  // NamenodeProtocol
-  /////////////////////////////////////////////////////
-  @Override // NamenodeProtocol
-  public BlocksWithLocations getBlocks(DatanodeInfo datanode, long size)
-  throws IOException {
-    if(size <= 0) {
-      throw new IllegalArgumentException(
-        "Unexpected not positive size: "+size);
-    }
-
-    return namesystem.getBlockManager().getBlocks(datanode, size); 
-  }
-
-  @Override // NamenodeProtocol
-  public ExportedBlockKeys getBlockKeys() throws IOException {
-    return namesystem.getBlockManager().getBlockKeys();
-  }
-
-  @Override // NamenodeProtocol
-  public void errorReport(NamenodeRegistration registration,
-                          int errorCode, 
-                          String msg) throws IOException {
-    verifyRequest(registration);
-    LOG.info("Error report from " + registration + ": " + msg);
-    if(errorCode == FATAL)
-      namesystem.releaseBackupNode(registration);
-  }
-
-  @Override // NamenodeProtocol
-  public NamenodeRegistration register(NamenodeRegistration registration)
-  throws IOException {
-    verifyVersion(registration.getVersion());
-    NamenodeRegistration myRegistration = setRegistration();
-    namesystem.registerBackupNode(registration, myRegistration);
-    return myRegistration;
-  }
-
-  @Override // NamenodeProtocol
-  public NamenodeCommand startCheckpoint(NamenodeRegistration registration)
-  throws IOException {
-    verifyRequest(registration);
-    if(!isRole(NamenodeRole.NAMENODE))
-      throw new IOException("Only an ACTIVE node can invoke startCheckpoint.");
-    return namesystem.startCheckpoint(registration, setRegistration());
-  }
-
-  @Override // NamenodeProtocol
-  public void endCheckpoint(NamenodeRegistration registration,
-                            CheckpointSignature sig) throws IOException {
-    verifyRequest(registration);
-    if(!isRole(NamenodeRole.NAMENODE))
-      throw new IOException("Only an ACTIVE node can invoke endCheckpoint.");
-    namesystem.endCheckpoint(registration, sig);
-  }
-
-  @Override // ClientProtocol
-  public Token<DelegationTokenIdentifier> getDelegationToken(Text renewer)
-      throws IOException {
-    return namesystem.getDelegationToken(renewer);
-  }
-
-  @Override // ClientProtocol
-  public long renewDelegationToken(Token<DelegationTokenIdentifier> token)
-      throws InvalidToken, IOException {
-    return namesystem.renewDelegationToken(token);
-  }
-
-  @Override // ClientProtocol
-  public void cancelDelegationToken(Token<DelegationTokenIdentifier> token)
-      throws IOException {
-    namesystem.cancelDelegationToken(token);
-  }
-  
-  @Override // ClientProtocol
-  public LocatedBlocks getBlockLocations(String src, 
-                                          long offset, 
-                                          long length) 
-      throws IOException {
-    metrics.incrGetBlockLocations();
-    return namesystem.getBlockLocations(getClientMachine(), 
-                                        src, offset, length);
-  }
-  
-  @Override // ClientProtocol
-  public FsServerDefaults getServerDefaults() throws IOException {
-    return namesystem.getServerDefaults();
-  }
-
-  @Override // ClientProtocol
-  public void create(String src, 
-                     FsPermission masked,
-                     String clientName, 
-                     EnumSetWritable<CreateFlag> flag,
-                     boolean createParent,
-                     short replication,
-                     long blockSize) throws IOException {
-    String clientMachine = getClientMachine();
-    if (stateChangeLog.isDebugEnabled()) {
-      stateChangeLog.debug("*DIR* NameNode.create: file "
-                         +src+" for "+clientName+" at "+clientMachine);
-    }
-    if (!checkPathLength(src)) {
-      throw new IOException("create: Pathname too long.  Limit "
-          + MAX_PATH_LENGTH + " characters, " + MAX_PATH_DEPTH + " levels.");
-    }
-    namesystem.startFile(src,
-        new 
PermissionStatus(UserGroupInformation.getCurrentUser().getShortUserName(),
-            null, masked),
-        clientName, clientMachine, flag.get(), createParent, replication, 
blockSize);
-    metrics.incrFilesCreated();
-    metrics.incrCreateFileOps();
-  }
-
-  @Override // ClientProtocol
-  public LocatedBlock append(String src, String clientName) 
-      throws IOException {
-    String clientMachine = getClientMachine();
-    if (stateChangeLog.isDebugEnabled()) {
-      stateChangeLog.debug("*DIR* NameNode.append: file "
-          +src+" for "+clientName+" at "+clientMachine);
-    }
-    LocatedBlock info = namesystem.appendFile(src, clientName, clientMachine);
-    metrics.incrFilesAppended();
-    return info;
-  }
-
-  @Override // ClientProtocol
-  public boolean recoverLease(String src, String clientName) throws 
IOException {
-    String clientMachine = getClientMachine();
-    return namesystem.recoverLease(src, clientName, clientMachine);
-  }
-
-  @Override // ClientProtocol
-  public boolean setReplication(String src, short replication) 
-    throws IOException {  
-    return namesystem.setReplication(src, replication);
-  }
-    
-  @Override // ClientProtocol
-  public void setPermission(String src, FsPermission permissions)
-      throws IOException {
-    namesystem.setPermission(src, permissions);
-  }
-
-  @Override // ClientProtocol
-  public void setOwner(String src, String username, String groupname)
-      throws IOException {
-    namesystem.setOwner(src, username, groupname);
-  }
-
-  @Override // ClientProtocol
-  public LocatedBlock addBlock(String src,
-                               String clientName,
-                               ExtendedBlock previous,
-                               DatanodeInfo[] excludedNodes)
-      throws IOException {
-    if(stateChangeLog.isDebugEnabled()) {
-      stateChangeLog.debug("*BLOCK* NameNode.addBlock: file "
-          +src+" for "+clientName);
-    }
-    HashMap<Node, Node> excludedNodesSet = null;
-    if (excludedNodes != null) {
-      excludedNodesSet = new HashMap<Node, Node>(excludedNodes.length);
-      for (Node node:excludedNodes) {
-        excludedNodesSet.put(node, node);
-      }
-    }
-    LocatedBlock locatedBlock = 
-      namesystem.getAdditionalBlock(src, clientName, previous, 
excludedNodesSet);
-    if (locatedBlock != null)
-      metrics.incrAddBlockOps();
-    return locatedBlock;
-  }
-
-  @Override // ClientProtocol
-  public LocatedBlock getAdditionalDatanode(final String src, final 
ExtendedBlock blk,
-      final DatanodeInfo[] existings, final DatanodeInfo[] excludes,
-      final int numAdditionalNodes, final String clientName
-      ) throws IOException {
-    if (LOG.isDebugEnabled()) {
-      LOG.debug("getAdditionalDatanode: src=" + src
-          + ", blk=" + blk
-          + ", existings=" + Arrays.asList(existings)
-          + ", excludes=" + Arrays.asList(excludes)
-          + ", numAdditionalNodes=" + numAdditionalNodes
-          + ", clientName=" + clientName);
-    }
-
-    metrics.incrGetAdditionalDatanodeOps();
-
-    HashMap<Node, Node> excludeSet = null;
-    if (excludes != null) {
-      excludeSet = new HashMap<Node, Node>(excludes.length);
-      for (Node node : excludes) {
-        excludeSet.put(node, node);
-      }
-    }
-    return namesystem.getAdditionalDatanode(src, blk,
-        existings, excludeSet, numAdditionalNodes, clientName);
-  }
-
-  /**
-   * The client needs to give up on the block.
-   */
-  public void abandonBlock(ExtendedBlock b, String src, String holder)
-      throws IOException {
-    if(stateChangeLog.isDebugEnabled()) {
-      stateChangeLog.debug("*BLOCK* NameNode.abandonBlock: "
-          +b+" of file "+src);
-    }
-    if (!namesystem.abandonBlock(b, src, holder)) {
-      throw new IOException("Cannot abandon block during write to " + src);
-    }
-  }
-
-  @Override // ClientProtocol
-  public boolean complete(String src, String clientName, ExtendedBlock last)
-      throws IOException {
-    if(stateChangeLog.isDebugEnabled()) {
-      stateChangeLog.debug("*DIR* NameNode.complete: "
-          + src + " for " + clientName);
-    }
-    return namesystem.completeFile(src, clientName, last);
-  }
-
-  /**
-   * The client has detected an error on the specified located blocks 
-   * and is reporting them to the server.  For now, the namenode will 
-   * mark the block as corrupt.  In the future we might 
-   * check the blocks are actually corrupt. 
-   */
-  @Override
-  public void reportBadBlocks(LocatedBlock[] blocks) throws IOException {
-    stateChangeLog.info("*DIR* NameNode.reportBadBlocks");
-    for (int i = 0; i < blocks.length; i++) {
-      ExtendedBlock blk = blocks[i].getBlock();
-      DatanodeInfo[] nodes = blocks[i].getLocations();
-      for (int j = 0; j < nodes.length; j++) {
-        DatanodeInfo dn = nodes[j];
-        namesystem.getBlockManager().findAndMarkBlockAsCorrupt(blk, dn);
-      }
-    }
-  }
-
-  @Override // ClientProtocol
-  public LocatedBlock updateBlockForPipeline(ExtendedBlock block, String 
clientName)
-      throws IOException {
-    return namesystem.updateBlockForPipeline(block, clientName);
-  }
-
-
-  @Override // ClientProtocol
-  public void updatePipeline(String clientName, ExtendedBlock oldBlock,
-      ExtendedBlock newBlock, DatanodeID[] newNodes)
-      throws IOException {
-    namesystem.updatePipeline(clientName, oldBlock, newBlock, newNodes);
-  }
-  
-  @Override // DatanodeProtocol
-  public void commitBlockSynchronization(ExtendedBlock block,
-      long newgenerationstamp, long newlength,
-      boolean closeFile, boolean deleteblock, DatanodeID[] newtargets)
-      throws IOException {
-    namesystem.commitBlockSynchronization(block,
-        newgenerationstamp, newlength, closeFile, deleteblock, newtargets);
-  }
-  
-  @Override // ClientProtocol
-  public long getPreferredBlockSize(String filename) 
-      throws IOException {
-    return namesystem.getPreferredBlockSize(filename);
-  }
-    
-  @Deprecated
-  @Override // ClientProtocol
-  public boolean rename(String src, String dst) throws IOException {
-    if(stateChangeLog.isDebugEnabled()) {
-      stateChangeLog.debug("*DIR* NameNode.rename: " + src + " to " + dst);
-    }
-    if (!checkPathLength(dst)) {
-      throw new IOException("rename: Pathname too long.  Limit "
-          + MAX_PATH_LENGTH + " characters, " + MAX_PATH_DEPTH + " levels.");
-    }
-    boolean ret = namesystem.renameTo(src, dst);
-    if (ret) {
-      metrics.incrFilesRenamed();
-    }
-    return ret;
-  }
-  
-  @Override // ClientProtocol
-  public void concat(String trg, String[] src) throws IOException {
-    namesystem.concat(trg, src);
-  }
-  
-  @Override // ClientProtocol
-  public void rename(String src, String dst, Options.Rename... options)
-      throws IOException {
-    if(stateChangeLog.isDebugEnabled()) {
-      stateChangeLog.debug("*DIR* NameNode.rename: " + src + " to " + dst);
-    }
-    if (!checkPathLength(dst)) {
-      throw new IOException("rename: Pathname too long.  Limit "
-          + MAX_PATH_LENGTH + " characters, " + MAX_PATH_DEPTH + " levels.");
-    }
-    namesystem.renameTo(src, dst, options);
-    metrics.incrFilesRenamed();
-  }
-
-  @Deprecated
-  @Override // ClientProtocol
-  public boolean delete(String src) throws IOException {
-    return delete(src, true);
-  }
-
-  @Override // ClientProtocol
-  public boolean delete(String src, boolean recursive) throws IOException {
-    if (stateChangeLog.isDebugEnabled()) {
-      stateChangeLog.debug("*DIR* Namenode.delete: src=" + src
-          + ", recursive=" + recursive);
-    }
-    boolean ret = namesystem.delete(src, recursive);
-    if (ret) 
-      metrics.incrDeleteFileOps();
-    return ret;
-  }
-
-  /**
-   * Check path length does not exceed maximum.  Returns true if
-   * length and depth are okay.  Returns false if length is too long 
-   * or depth is too great.
-   */
-  private boolean checkPathLength(String src) {
-    Path srcPath = new Path(src);
-    return (src.length() <= MAX_PATH_LENGTH &&
-            srcPath.depth() <= MAX_PATH_DEPTH);
-  }
-    
-  @Override // ClientProtocol
-  public boolean mkdirs(String src, FsPermission masked, boolean createParent)
-      throws IOException {
-    if(stateChangeLog.isDebugEnabled()) {
-      stateChangeLog.debug("*DIR* NameNode.mkdirs: " + src);
-    }
-    if (!checkPathLength(src)) {
-      throw new IOException("mkdirs: Pathname too long.  Limit " 
-                            + MAX_PATH_LENGTH + " characters, " + 
MAX_PATH_DEPTH + " levels.");
-    }
-    return namesystem.mkdirs(src,
-        new 
PermissionStatus(UserGroupInformation.getCurrentUser().getShortUserName(),
-            null, masked), createParent);
-  }
-
-  @Override // ClientProtocol
-  public void renewLease(String clientName) throws IOException {
-    namesystem.renewLease(clientName);        
-  }
-
-  @Override // ClientProtocol
-  public DirectoryListing getListing(String src, byte[] startAfter,
-      boolean needLocation)
-  throws IOException {
-    DirectoryListing files = namesystem.getListing(
-        src, startAfter, needLocation);
-    if (files != null) {
-      metrics.incrGetListingOps();
-      metrics.incrFilesInGetListingOps(files.getPartialListing().length);
-    }
-    return files;
-  }
-
-  @Override // ClientProtocol
-  public HdfsFileStatus getFileInfo(String src)  throws IOException {
-    metrics.incrFileInfoOps();
-    return namesystem.getFileInfo(src, true);
-  }
-
-  @Override // ClientProtocol
-  public HdfsFileStatus getFileLinkInfo(String src) throws IOException { 
-    metrics.incrFileInfoOps();
-    return namesystem.getFileInfo(src, false);
-  }
-  
-  @Override
-  public long[] getStats() {
-    return namesystem.getStats();
-  }
-
-  @Override // ClientProtocol
-  public DatanodeInfo[] getDatanodeReport(DatanodeReportType type)
-      throws IOException {
-    DatanodeInfo results[] = namesystem.datanodeReport(type);
-    if (results == null ) {
-      throw new IOException("Cannot find datanode report");
-    }
-    return results;
-  }
-    
-  @Override // ClientProtocol
-  public boolean setSafeMode(SafeModeAction action) throws IOException {
-    return namesystem.setSafeMode(action);
-  }
-
   /**
    * Is the cluster currently in safe mode?
    */
@@ -1032,257 +512,6 @@ public class NameNode implements Namenod
     return namesystem.isInSafeMode();
   }
 
-  @Override // ClientProtocol
-  public boolean restoreFailedStorage(String arg) 
-      throws AccessControlException {
-    return namesystem.restoreFailedStorage(arg);
-  }
-
-  @Override // ClientProtocol
-  public void saveNamespace() throws IOException {
-    namesystem.saveNamespace();
-  }
-
-  @Override // ClientProtocol
-  public void refreshNodes() throws IOException {
-    namesystem.getBlockManager().getDatanodeManager().refreshNodes(
-        new HdfsConfiguration());
-  }
-
-  @Override // NamenodeProtocol
-  public long getTransactionID() {
-    return namesystem.getEditLog().getSyncTxId();
-  }
-
-  @Override // NamenodeProtocol
-  public CheckpointSignature rollEditLog() throws IOException {
-    return namesystem.rollEditLog();
-  }
-  
-  @Override
-  public RemoteEditLogManifest getEditLogManifest(long sinceTxId)
-  throws IOException {
-    return namesystem.getEditLog().getEditLogManifest(sinceTxId);
-  }
-    
-  @Override // ClientProtocol
-  public void finalizeUpgrade() throws IOException {
-    namesystem.finalizeUpgrade();
-  }
-
-  @Override // ClientProtocol
-  public UpgradeStatusReport distributedUpgradeProgress(UpgradeAction action)
-      throws IOException {
-    return namesystem.distributedUpgradeProgress(action);
-  }
-
-  @Override // ClientProtocol
-  public void metaSave(String filename) throws IOException {
-    namesystem.metaSave(filename);
-  }
-
-  @Override // ClientProtocol
-  public CorruptFileBlocks listCorruptFileBlocks(String path, String cookie)
-      throws IOException {
-    Collection<FSNamesystem.CorruptFileBlockInfo> fbs =
-      namesystem.listCorruptFileBlocks(path, cookie);
-    
-    String[] files = new String[fbs.size()];
-    String lastCookie = "";
-    int i = 0;
-    for(FSNamesystem.CorruptFileBlockInfo fb: fbs) {
-      files[i++] = fb.path;
-      lastCookie = fb.block.getBlockName();
-    }
-    return new CorruptFileBlocks(files, lastCookie);
-  }
-
-  /**
-   * Tell all datanodes to use a new, non-persistent bandwidth value for
-   * dfs.datanode.balance.bandwidthPerSec.
-   * @param bandwidth Blanacer bandwidth in bytes per second for all datanodes.
-   * @throws IOException
-   */
-  @Override // ClientProtocol
-  public void setBalancerBandwidth(long bandwidth) throws IOException {
-    
namesystem.getBlockManager().getDatanodeManager().setBalancerBandwidth(bandwidth);
-  }
-  
-  @Override // ClientProtocol
-  public ContentSummary getContentSummary(String path) throws IOException {
-    return namesystem.getContentSummary(path);
-  }
-
-  @Override // ClientProtocol
-  public void setQuota(String path, long namespaceQuota, long diskspaceQuota) 
-      throws IOException {
-    namesystem.setQuota(path, namespaceQuota, diskspaceQuota);
-  }
-  
-  @Override // ClientProtocol
-  public void fsync(String src, String clientName) throws IOException {
-    namesystem.fsync(src, clientName);
-  }
-
-  @Override // ClientProtocol
-  public void setTimes(String src, long mtime, long atime) 
-      throws IOException {
-    namesystem.setTimes(src, mtime, atime);
-  }
-
-  @Override // ClientProtocol
-  public void createSymlink(String target, String link, FsPermission dirPerms,
-      boolean createParent) throws IOException {
-    metrics.incrCreateSymlinkOps();
-    /* We enforce the MAX_PATH_LENGTH limit even though a symlink target 
-     * URI may refer to a non-HDFS file system. 
-     */
-    if (!checkPathLength(link)) {
-      throw new IOException("Symlink path exceeds " + MAX_PATH_LENGTH +
-                            " character limit");
-                            
-    }
-    if ("".equals(target)) {
-      throw new IOException("Invalid symlink target");
-    }
-    final UserGroupInformation ugi = UserGroupInformation.getCurrentUser();
-    namesystem.createSymlink(target, link,
-      new PermissionStatus(ugi.getShortUserName(), null, dirPerms), 
createParent);
-  }
-
-  @Override // ClientProtocol
-  public String getLinkTarget(String path) throws IOException {
-    metrics.incrGetLinkTargetOps();
-    /* Resolves the first symlink in the given path, returning a
-     * new path consisting of the target of the symlink and any 
-     * remaining path components from the original path.
-     */
-    try {
-      HdfsFileStatus stat = namesystem.getFileInfo(path, false);
-      if (stat != null) {
-        // NB: getSymlink throws IOException if !stat.isSymlink() 
-        return stat.getSymlink();
-      }
-    } catch (UnresolvedPathException e) {
-      return e.getResolvedPath().toString();
-    } catch (UnresolvedLinkException e) {
-      // The NameNode should only throw an UnresolvedPathException
-      throw new AssertionError("UnresolvedLinkException thrown");
-    }
-    return null;
-  }
-
-
-  @Override // DatanodeProtocol
-  public DatanodeRegistration registerDatanode(DatanodeRegistration nodeReg)
-      throws IOException {
-    verifyVersion(nodeReg.getVersion());
-    namesystem.registerDatanode(nodeReg);
-      
-    return nodeReg;
-  }
-
-  @Override // DatanodeProtocol
-  public DatanodeCommand[] sendHeartbeat(DatanodeRegistration nodeReg,
-      long capacity, long dfsUsed, long remaining, long blockPoolUsed,
-      int xmitsInProgress, int xceiverCount, int failedVolumes)
-      throws IOException {
-    verifyRequest(nodeReg);
-    return namesystem.handleHeartbeat(nodeReg, capacity, dfsUsed, remaining,
-        blockPoolUsed, xceiverCount, xmitsInProgress, failedVolumes);
-  }
-
-  @Override // DatanodeProtocol
-  public DatanodeCommand blockReport(DatanodeRegistration nodeReg,
-      String poolId, long[] blocks) throws IOException {
-    verifyRequest(nodeReg);
-    BlockListAsLongs blist = new BlockListAsLongs(blocks);
-    if(stateChangeLog.isDebugEnabled()) {
-      stateChangeLog.debug("*BLOCK* NameNode.blockReport: "
-           + "from " + nodeReg.getName() + " " + blist.getNumberOfBlocks()
-           + " blocks");
-    }
-
-    namesystem.getBlockManager().processReport(nodeReg, poolId, blist);
-    if (getFSImage().isUpgradeFinalized())
-      return new DatanodeCommand.Finalize(poolId);
-    return null;
-  }
-
-  @Override // DatanodeProtocol
-  public void blockReceivedAndDeleted(DatanodeRegistration nodeReg, String 
poolId,
-      ReceivedDeletedBlockInfo[] receivedAndDeletedBlocks) throws IOException {
-    verifyRequest(nodeReg);
-    if(stateChangeLog.isDebugEnabled()) {
-      stateChangeLog.debug("*BLOCK* NameNode.blockReceivedAndDeleted: "
-          +"from "+nodeReg.getName()+" "+receivedAndDeletedBlocks.length
-          +" blocks.");
-    }
-    namesystem.getBlockManager().blockReceivedAndDeleted(
-        nodeReg, poolId, receivedAndDeletedBlocks);
-  }
-
-  @Override // DatanodeProtocol
-  public void errorReport(DatanodeRegistration nodeReg,
-                          int errorCode, String msg) throws IOException { 
-    String dnName = (nodeReg == null ? "unknown DataNode" : nodeReg.getName());
-
-    if (errorCode == DatanodeProtocol.NOTIFY) {
-      LOG.info("Error report from " + dnName + ": " + msg);
-      return;
-    }
-    verifyRequest(nodeReg);
-
-    if (errorCode == DatanodeProtocol.DISK_ERROR) {
-      LOG.warn("Disk error on " + dnName + ": " + msg);
-    } else if (errorCode == DatanodeProtocol.FATAL_DISK_ERROR) {
-      LOG.warn("Fatal disk error on " + dnName + ": " + msg);
-      
namesystem.getBlockManager().getDatanodeManager().removeDatanode(nodeReg);      
      
-    } else {
-      LOG.info("Error report from " + dnName + ": " + msg);
-    }
-  }
-    
-  @Override // DatanodeProtocol, NamenodeProtocol
-  public NamespaceInfo versionRequest() throws IOException {
-    return namesystem.getNamespaceInfo();
-  }
-
-  @Override // DatanodeProtocol
-  public UpgradeCommand processUpgradeCommand(UpgradeCommand comm) throws 
IOException {
-    return namesystem.processDistributedUpgradeCommand(comm);
-  }
-
-  /** 
-   * Verify request.
-   * 
-   * Verifies correctness of the datanode version, registration ID, and 
-   * if the datanode does not need to be shutdown.
-   * 
-   * @param nodeReg data node registration
-   * @throws IOException
-   */
-  void verifyRequest(NodeRegistration nodeReg) throws IOException {
-    verifyVersion(nodeReg.getVersion());
-    if (!namesystem.getRegistrationID().equals(nodeReg.getRegistrationID())) {
-      LOG.warn("Invalid registrationID - expected: "
-          + namesystem.getRegistrationID() + " received: "
-          + nodeReg.getRegistrationID());
-      throw new UnregisteredNodeException(nodeReg);
-    }
-  }
-    
-  /**
-   * Verify version.
-   * 
-   * @param version
-   * @throws IOException
-   */
-  void verifyVersion(int version) throws IOException {
-    if (version != HdfsConstants.LAYOUT_VERSION)
-      throw new IncorrectVersionException(version, "data node");
-  }
-    
   /** get FSImage */
   FSImage getFSImage() {
     return namesystem.dir.fsImage;
@@ -1293,7 +522,7 @@ public class NameNode implements Namenod
    * @return namenode rpc address
    */
   public InetSocketAddress getNameNodeAddress() {
-    return rpcAddress;
+    return rpcServer.getRpcAddress();
   }
   
   /**
@@ -1302,7 +531,7 @@ public class NameNode implements Namenod
    * @return namenode service rpc address used by datanodes
    */
   public InetSocketAddress getServiceRpcAddress() {
-    return serviceRPCAddress != null ? serviceRPCAddress : rpcAddress;
+    return rpcServer.getServiceRpcAddress() != null ? 
rpcServer.getServiceRpcAddress() : rpcServer.getRpcAddress();
   }
 
   /**
@@ -1387,40 +616,6 @@ public class NameNode implements Namenod
     return false;
   }
 
-  @Override // RefreshAuthorizationPolicyProtocol
-  public void refreshServiceAcl() throws IOException {
-    if (!serviceAuthEnabled) {
-      throw new AuthorizationException("Service Level Authorization not 
enabled!");
-    }
-
-    this.server.refreshServiceAcl(new Configuration(), new 
HDFSPolicyProvider());
-    if (this.serviceRpcServer != null) {
-      this.serviceRpcServer.refreshServiceAcl(new Configuration(), new 
HDFSPolicyProvider());
-    }
-  }
-
-  @Override // RefreshAuthorizationPolicyProtocol
-  public void refreshUserToGroupsMappings() throws IOException {
-    LOG.info("Refreshing all user-to-groups mappings. Requested by user: " + 
-             UserGroupInformation.getCurrentUser().getShortUserName());
-    Groups.getUserToGroupsMappingService().refresh();
-  }
-
-  @Override // RefreshAuthorizationPolicyProtocol
-  public void refreshSuperUserGroupsConfiguration() {
-    LOG.info("Refreshing SuperUser proxy group mapping list ");
-
-    ProxyUsers.refreshSuperUserGroupsConfiguration();
-  }
-  
-  @Override // GetUserMappingsProtocol
-  public String[] getGroupsForUser(String user) throws IOException {
-    if (LOG.isDebugEnabled()) {
-      LOG.debug("Getting groups for user " + user);
-    }
-    return UserGroupInformation.createRemoteUser(user).getGroupNames();
-  }
-
   private static void printUsage() {
     System.err.println(
       "Usage: java NameNode [" +
@@ -1593,11 +788,4 @@ public class NameNode implements Namenod
     }
   }
   
-  private static String getClientMachine() {
-    String clientMachine = Server.getRemoteAddress();
-    if (clientMachine == null) {
-      clientMachine = "";
-    }
-    return clientMachine;
-  }
 }


Reply via email to