Author: arp
Date: Fri Feb 28 19:48:31 2014
New Revision: 1573052

URL: http://svn.apache.org/r1573052
Log:
HADOOP-10285. Admin interface to swap callqueue at runtime. (Contributed by 
Chris Li)

Modified:
    
hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/NameNodeProxies.java
    
hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/NameNode.java
    
hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/NameNodeRpcServer.java
    
hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/protocol/NamenodeProtocols.java
    
hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/tools/DFSAdmin.java
    
hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestIsMethodSupported.java

Modified: 
hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/NameNodeProxies.java
URL: 
http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/NameNodeProxies.java?rev=1573052&r1=1573051&r2=1573052&view=diff
==============================================================================
--- 
hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/NameNodeProxies.java
 (original)
+++ 
hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/NameNodeProxies.java
 Fri Feb 28 19:48:31 2014
@@ -75,6 +75,9 @@ import org.apache.hadoop.security.protoc
 import 
org.apache.hadoop.security.protocolPB.RefreshAuthorizationPolicyProtocolPB;
 import 
org.apache.hadoop.security.protocolPB.RefreshUserMappingsProtocolClientSideTranslatorPB;
 import org.apache.hadoop.security.protocolPB.RefreshUserMappingsProtocolPB;
+import org.apache.hadoop.ipc.RefreshCallQueueProtocol;
+import org.apache.hadoop.ipc.protocolPB.RefreshCallQueueProtocolPB;
+import 
org.apache.hadoop.ipc.protocolPB.RefreshCallQueueProtocolClientSideTranslatorPB;
 import org.apache.hadoop.tools.GetUserMappingsProtocol;
 import 
org.apache.hadoop.tools.protocolPB.GetUserMappingsProtocolClientSideTranslatorPB;
 import org.apache.hadoop.tools.protocolPB.GetUserMappingsProtocolPB;
@@ -252,13 +255,16 @@ public class NameNodeProxies {
     } else if (xface == RefreshAuthorizationPolicyProtocol.class) {
       proxy = (T) createNNProxyWithRefreshAuthorizationPolicyProtocol(nnAddr,
           conf, ugi);
+    } else if (xface == RefreshCallQueueProtocol.class) {
+      proxy = (T) createNNProxyWithRefreshCallQueueProtocol(nnAddr, conf, ugi);
     } else {
-      String message = "Upsupported protocol found when creating the proxy " +
+      String message = "Unsupported protocol found when creating the proxy " +
           "connection to NameNode: " +
           ((xface != null) ? xface.getClass().getName() : "null");
       LOG.error(message);
       throw new IllegalStateException(message);
     }
+
     return new ProxyAndInfo<T>(proxy, dtService);
   }
   
@@ -286,6 +292,14 @@ public class NameNodeProxies {
     return new RefreshUserMappingsProtocolClientSideTranslatorPB(proxy);
   }
 
+  private static RefreshCallQueueProtocol
+      createNNProxyWithRefreshCallQueueProtocol(InetSocketAddress address,
+          Configuration conf, UserGroupInformation ugi) throws IOException {
+    RefreshCallQueueProtocolPB proxy = (RefreshCallQueueProtocolPB)
+        createNameNodeProxy(address, conf, ugi, 
RefreshCallQueueProtocolPB.class, 0);
+    return new RefreshCallQueueProtocolClientSideTranslatorPB(proxy);
+  }
+
   private static GetUserMappingsProtocol 
createNNProxyWithGetUserMappingsProtocol(
       InetSocketAddress address, Configuration conf, UserGroupInformation ugi)
       throws IOException {

Modified: 
hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/NameNode.java
URL: 
http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/NameNode.java?rev=1573052&r1=1573051&r2=1573052&view=diff
==============================================================================
--- 
hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/NameNode.java
 (original)
+++ 
hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/NameNode.java
 Fri Feb 28 19:48:31 2014
@@ -56,6 +56,7 @@ import org.apache.hadoop.security.Refres
 import org.apache.hadoop.security.SecurityUtil;
 import org.apache.hadoop.security.UserGroupInformation;
 import org.apache.hadoop.security.authorize.RefreshAuthorizationPolicyProtocol;
+import org.apache.hadoop.ipc.RefreshCallQueueProtocol;
 import org.apache.hadoop.tools.GetUserMappingsProtocol;
 import org.apache.hadoop.util.ExitUtil.ExitException;
 import org.apache.hadoop.util.GenericOptionsParser;
@@ -224,6 +225,8 @@ public class NameNode implements NameNod
       return RefreshAuthorizationPolicyProtocol.versionID;
     } else if (protocol.equals(RefreshUserMappingsProtocol.class.getName())){
       return RefreshUserMappingsProtocol.versionID;
+    } else if (protocol.equals(RefreshCallQueueProtocol.class.getName())) {
+      return RefreshCallQueueProtocol.versionID;
     } else if (protocol.equals(GetUserMappingsProtocol.class.getName())){
       return GetUserMappingsProtocol.versionID;
     } else {

Modified: 
hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/NameNodeRpcServer.java
URL: 
http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/NameNodeRpcServer.java?rev=1573052&r1=1573051&r2=1573052&view=diff
==============================================================================
--- 
hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/NameNodeRpcServer.java
 (original)
+++ 
hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/NameNodeRpcServer.java
 Fri Feb 28 19:48:31 2014
@@ -138,6 +138,9 @@ import org.apache.hadoop.security.protoc
 import 
org.apache.hadoop.security.protocolPB.RefreshAuthorizationPolicyProtocolServerSideTranslatorPB;
 import org.apache.hadoop.security.protocolPB.RefreshUserMappingsProtocolPB;
 import 
org.apache.hadoop.security.protocolPB.RefreshUserMappingsProtocolServerSideTranslatorPB;
+import org.apache.hadoop.ipc.protocolPB.RefreshCallQueueProtocolPB;
+import 
org.apache.hadoop.ipc.protocolPB.RefreshCallQueueProtocolServerSideTranslatorPB;
+import 
org.apache.hadoop.ipc.proto.RefreshCallQueueProtocolProtos.RefreshCallQueueProtocolService;
 import org.apache.hadoop.security.token.SecretManager.InvalidToken;
 import org.apache.hadoop.security.token.Token;
 import 
org.apache.hadoop.tools.proto.GetUserMappingsProtocolProtos.GetUserMappingsProtocolService;
@@ -215,6 +218,11 @@ class NameNodeRpcServer implements Namen
     BlockingService refreshUserMappingService = 
RefreshUserMappingsProtocolService
         .newReflectiveBlockingService(refreshUserMappingXlator);
 
+    RefreshCallQueueProtocolServerSideTranslatorPB refreshCallQueueXlator = 
+        new RefreshCallQueueProtocolServerSideTranslatorPB(this);
+    BlockingService refreshCallQueueService = RefreshCallQueueProtocolService
+        .newReflectiveBlockingService(refreshCallQueueXlator);
+
     GetUserMappingsProtocolServerSideTranslatorPB getUserMappingXlator = 
         new GetUserMappingsProtocolServerSideTranslatorPB(this);
     BlockingService getUserMappingService = GetUserMappingsProtocolService
@@ -261,6 +269,9 @@ class NameNodeRpcServer implements Namen
           refreshAuthService, serviceRpcServer);
       DFSUtil.addPBProtocol(conf, RefreshUserMappingsProtocolPB.class, 
           refreshUserMappingService, serviceRpcServer);
+      // We support Refreshing call queue here in case the client RPC queue is 
full
+      DFSUtil.addPBProtocol(conf, RefreshCallQueueProtocolPB.class,
+          refreshCallQueueService, serviceRpcServer);
       DFSUtil.addPBProtocol(conf, GetUserMappingsProtocolPB.class, 
           getUserMappingService, serviceRpcServer);
   
@@ -303,6 +314,8 @@ class NameNodeRpcServer implements Namen
         refreshAuthService, clientRpcServer);
     DFSUtil.addPBProtocol(conf, RefreshUserMappingsProtocolPB.class, 
         refreshUserMappingService, clientRpcServer);
+    DFSUtil.addPBProtocol(conf, RefreshCallQueueProtocolPB.class,
+        refreshCallQueueService, clientRpcServer);
     DFSUtil.addPBProtocol(conf, GetUserMappingsProtocolPB.class, 
         getUserMappingService, clientRpcServer);
 
@@ -1095,6 +1108,17 @@ class NameNodeRpcServer implements Namen
 
     ProxyUsers.refreshSuperUserGroupsConfiguration();
   }
+
+  @Override // RefreshCallQueueProtocol
+  public void refreshCallQueue() {
+    LOG.info("Refreshing call queue.");
+
+    Configuration conf = new Configuration();
+    clientRpcServer.refreshCallQueue(conf);
+    if (this.serviceRpcServer != null) {
+      serviceRpcServer.refreshCallQueue(conf);
+    }
+  }
   
   @Override // GetUserMappingsProtocol
   public String[] getGroupsForUser(String user) throws IOException {

Modified: 
hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/protocol/NamenodeProtocols.java
URL: 
http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/protocol/NamenodeProtocols.java?rev=1573052&r1=1573051&r2=1573052&view=diff
==============================================================================
--- 
hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/protocol/NamenodeProtocols.java
 (original)
+++ 
hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/protocol/NamenodeProtocols.java
 Fri Feb 28 19:48:31 2014
@@ -23,6 +23,7 @@ import org.apache.hadoop.ha.HAServicePro
 import org.apache.hadoop.hdfs.protocol.ClientProtocol;
 import org.apache.hadoop.security.authorize.RefreshAuthorizationPolicyProtocol;
 import org.apache.hadoop.security.RefreshUserMappingsProtocol;
+import org.apache.hadoop.ipc.RefreshCallQueueProtocol;
 import org.apache.hadoop.tools.GetUserMappingsProtocol;
 
 /** The full set of RPC methods implemented by the Namenode.  */
@@ -33,6 +34,7 @@ public interface NamenodeProtocols
           NamenodeProtocol,
           RefreshAuthorizationPolicyProtocol,
           RefreshUserMappingsProtocol,
+          RefreshCallQueueProtocol,
           GetUserMappingsProtocol,
           HAServiceProtocol {
 }

Modified: 
hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/tools/DFSAdmin.java
URL: 
http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/tools/DFSAdmin.java?rev=1573052&r1=1573051&r2=1573052&view=diff
==============================================================================
--- 
hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/tools/DFSAdmin.java
 (original)
+++ 
hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/tools/DFSAdmin.java
 Fri Feb 28 19:48:31 2014
@@ -62,6 +62,7 @@ import org.apache.hadoop.security.Refres
 import org.apache.hadoop.security.SecurityUtil;
 import org.apache.hadoop.security.UserGroupInformation;
 import org.apache.hadoop.security.authorize.RefreshAuthorizationPolicyProtocol;
+import org.apache.hadoop.ipc.RefreshCallQueueProtocol;
 import org.apache.hadoop.util.StringUtils;
 import org.apache.hadoop.util.ToolRunner;
 
@@ -580,6 +581,7 @@ public class DFSAdmin extends FsShell {
       "\t[-refreshServiceAcl]\n" +
       "\t[-refreshUserToGroupsMappings]\n" +
       "\t[refreshSuperUserGroupsConfiguration]\n" +
+      "\t[-refreshCallQueue]\n" +
       "\t[-printTopology]\n" +
       "\t[-refreshNamenodes datanodehost:port]\n"+
       "\t[-deleteBlockPool datanodehost:port blockpoolId [force]]\n"+
@@ -649,6 +651,8 @@ public class DFSAdmin extends FsShell {
     String refreshSuperUserGroupsConfiguration = 
       "-refreshSuperUserGroupsConfiguration: Refresh superuser proxy groups 
mappings\n";
 
+    String refreshCallQueue = "-refreshCallQueue: Reload the call queue from 
config\n";
+
     String printTopology = "-printTopology: Print a tree of the racks and 
their\n" +
                            "\t\tnodes as reported by the Namenode\n";
     
@@ -717,6 +721,8 @@ public class DFSAdmin extends FsShell {
       System.out.println(refreshUserToGroupsMappings);
     } else if ("refreshSuperUserGroupsConfiguration".equals(cmd)) {
       System.out.println(refreshSuperUserGroupsConfiguration);
+    } else if ("refreshCallQueue".equals(cmd)) {
+      System.out.println(refreshCallQueue);
     } else if ("printTopology".equals(cmd)) {
       System.out.println(printTopology);
     } else if ("refreshNamenodes".equals(cmd)) {
@@ -750,6 +756,7 @@ public class DFSAdmin extends FsShell {
       System.out.println(refreshServiceAcl);
       System.out.println(refreshUserToGroupsMappings);
       System.out.println(refreshSuperUserGroupsConfiguration);
+      System.out.println(refreshCallQueue);
       System.out.println(printTopology);
       System.out.println(refreshNamenodes);
       System.out.println(deleteBlockPool);
@@ -939,6 +946,27 @@ public class DFSAdmin extends FsShell {
     return 0;
   }
 
+  public int refreshCallQueue() throws IOException {
+    // Get the current configuration
+    Configuration conf = getConf();
+    
+    // for security authorization
+    // server principal for this call   
+    // should be NN's one.
+    conf.set(CommonConfigurationKeys.HADOOP_SECURITY_SERVICE_USER_NAME_KEY, 
+        conf.get(DFSConfigKeys.DFS_NAMENODE_USER_NAME_KEY, ""));
+ 
+    // Create the client
+    RefreshCallQueueProtocol refreshProtocol =
+      NameNodeProxies.createProxy(conf, FileSystem.getDefaultUri(conf),
+          RefreshCallQueueProtocol.class).getProxy();
+
+    // Refresh the user-to-groups mappings
+    refreshProtocol.refreshCallQueue();
+    
+    return 0;
+  }
+
   /**
    * Displays format of commands.
    * @param cmd The command that is being executed.
@@ -995,6 +1023,9 @@ public class DFSAdmin extends FsShell {
     } else if ("-refreshSuperUserGroupsConfiguration".equals(cmd)) {
       System.err.println("Usage: java DFSAdmin"
                          + " [-refreshSuperUserGroupsConfiguration]");
+    } else if ("-refreshCallQueue".equals(cmd)) {
+      System.err.println("Usage: java DFSAdmin"
+                         + " [-refreshCallQueue]");
     } else if ("-printTopology".equals(cmd)) {
       System.err.println("Usage: java DFSAdmin"
                          + " [-printTopology]");
@@ -1026,6 +1057,7 @@ public class DFSAdmin extends FsShell {
       System.err.println("           [-refreshServiceAcl]");
       System.err.println("           [-refreshUserToGroupsMappings]");
       System.err.println("           [-refreshSuperUserGroupsConfiguration]");
+      System.err.println("           [-refreshCallQueue]");
       System.err.println("           [-printTopology]");
       System.err.println("           [-refreshNamenodes datanodehost:port]");
       System.err.println("           [-deleteBlockPool datanode-host:port 
blockpoolId [force]]");
@@ -1197,6 +1229,8 @@ public class DFSAdmin extends FsShell {
         exitCode = refreshUserToGroupsMappings();
       } else if ("-refreshSuperUserGroupsConfiguration".equals(cmd)) {
         exitCode = refreshSuperUserGroupsConfiguration();
+      } else if ("-refreshCallQueue".equals(cmd)) {
+        exitCode = refreshCallQueue();
       } else if ("-printTopology".equals(cmd)) {
         exitCode = printTopology();
       } else if ("-refreshNamenodes".equals(cmd)) {

Modified: 
hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestIsMethodSupported.java
URL: 
http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestIsMethodSupported.java?rev=1573052&r1=1573051&r2=1573052&view=diff
==============================================================================
--- 
hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestIsMethodSupported.java
 (original)
+++ 
hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestIsMethodSupported.java
 Fri Feb 28 19:48:31 2014
@@ -39,6 +39,8 @@ import org.apache.hadoop.security.UserGr
 import org.apache.hadoop.security.authorize.RefreshAuthorizationPolicyProtocol;
 import 
org.apache.hadoop.security.protocolPB.RefreshAuthorizationPolicyProtocolClientSideTranslatorPB;
 import 
org.apache.hadoop.security.protocolPB.RefreshUserMappingsProtocolClientSideTranslatorPB;
+import 
org.apache.hadoop.ipc.protocolPB.RefreshCallQueueProtocolClientSideTranslatorPB;
+import org.apache.hadoop.ipc.RefreshCallQueueProtocol;
 import org.apache.hadoop.tools.GetUserMappingsProtocol;
 import 
org.apache.hadoop.tools.protocolPB.GetUserMappingsProtocolClientSideTranslatorPB;
 import org.junit.AfterClass;
@@ -169,4 +171,15 @@ public class TestIsMethodSupported {
     assertTrue(
         translator.isMethodSupported("refreshUserToGroupsMappings"));
   }
+
+  @Test
+  public void testRefreshCallQueueProtocol() throws IOException {
+    RefreshCallQueueProtocolClientSideTranslatorPB translator =
+        (RefreshCallQueueProtocolClientSideTranslatorPB)
+        NameNodeProxies.createNonHAProxy(conf, nnAddress,
+            RefreshCallQueueProtocol.class,
+            UserGroupInformation.getCurrentUser(), true).getProxy();
+    assertTrue(
+        translator.isMethodSupported("refreshCallQueue"));
+  }
 }


Reply via email to