Author: arp Date: Fri Feb 28 19:48:31 2014 New Revision: 1573052 URL: http://svn.apache.org/r1573052 Log: HADOOP-10285. Admin interface to swap callqueue at runtime. (Contributed by Chris Li)
Modified: hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/NameNodeProxies.java hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/NameNode.java hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/NameNodeRpcServer.java hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/protocol/NamenodeProtocols.java hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/tools/DFSAdmin.java hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestIsMethodSupported.java Modified: hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/NameNodeProxies.java URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/NameNodeProxies.java?rev=1573052&r1=1573051&r2=1573052&view=diff ============================================================================== --- hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/NameNodeProxies.java (original) +++ hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/NameNodeProxies.java Fri Feb 28 19:48:31 2014 @@ -75,6 +75,9 @@ import org.apache.hadoop.security.protoc import org.apache.hadoop.security.protocolPB.RefreshAuthorizationPolicyProtocolPB; import org.apache.hadoop.security.protocolPB.RefreshUserMappingsProtocolClientSideTranslatorPB; import org.apache.hadoop.security.protocolPB.RefreshUserMappingsProtocolPB; +import org.apache.hadoop.ipc.RefreshCallQueueProtocol; +import org.apache.hadoop.ipc.protocolPB.RefreshCallQueueProtocolPB; +import org.apache.hadoop.ipc.protocolPB.RefreshCallQueueProtocolClientSideTranslatorPB; import org.apache.hadoop.tools.GetUserMappingsProtocol; import org.apache.hadoop.tools.protocolPB.GetUserMappingsProtocolClientSideTranslatorPB; import org.apache.hadoop.tools.protocolPB.GetUserMappingsProtocolPB; @@ -252,13 +255,16 @@ public class NameNodeProxies { } else if (xface == RefreshAuthorizationPolicyProtocol.class) { proxy = (T) createNNProxyWithRefreshAuthorizationPolicyProtocol(nnAddr, conf, ugi); + } else if (xface == RefreshCallQueueProtocol.class) { + proxy = (T) createNNProxyWithRefreshCallQueueProtocol(nnAddr, conf, ugi); } else { - String message = "Upsupported protocol found when creating the proxy " + + String message = "Unsupported protocol found when creating the proxy " + "connection to NameNode: " + ((xface != null) ? xface.getClass().getName() : "null"); LOG.error(message); throw new IllegalStateException(message); } + return new ProxyAndInfo<T>(proxy, dtService); } @@ -286,6 +292,14 @@ public class NameNodeProxies { return new RefreshUserMappingsProtocolClientSideTranslatorPB(proxy); } + private static RefreshCallQueueProtocol + createNNProxyWithRefreshCallQueueProtocol(InetSocketAddress address, + Configuration conf, UserGroupInformation ugi) throws IOException { + RefreshCallQueueProtocolPB proxy = (RefreshCallQueueProtocolPB) + createNameNodeProxy(address, conf, ugi, RefreshCallQueueProtocolPB.class, 0); + return new RefreshCallQueueProtocolClientSideTranslatorPB(proxy); + } + private static GetUserMappingsProtocol createNNProxyWithGetUserMappingsProtocol( InetSocketAddress address, Configuration conf, UserGroupInformation ugi) throws IOException { Modified: hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/NameNode.java URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/NameNode.java?rev=1573052&r1=1573051&r2=1573052&view=diff ============================================================================== --- hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/NameNode.java (original) +++ hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/NameNode.java Fri Feb 28 19:48:31 2014 @@ -56,6 +56,7 @@ import org.apache.hadoop.security.Refres import org.apache.hadoop.security.SecurityUtil; import org.apache.hadoop.security.UserGroupInformation; import org.apache.hadoop.security.authorize.RefreshAuthorizationPolicyProtocol; +import org.apache.hadoop.ipc.RefreshCallQueueProtocol; import org.apache.hadoop.tools.GetUserMappingsProtocol; import org.apache.hadoop.util.ExitUtil.ExitException; import org.apache.hadoop.util.GenericOptionsParser; @@ -224,6 +225,8 @@ public class NameNode implements NameNod return RefreshAuthorizationPolicyProtocol.versionID; } else if (protocol.equals(RefreshUserMappingsProtocol.class.getName())){ return RefreshUserMappingsProtocol.versionID; + } else if (protocol.equals(RefreshCallQueueProtocol.class.getName())) { + return RefreshCallQueueProtocol.versionID; } else if (protocol.equals(GetUserMappingsProtocol.class.getName())){ return GetUserMappingsProtocol.versionID; } else { Modified: hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/NameNodeRpcServer.java URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/NameNodeRpcServer.java?rev=1573052&r1=1573051&r2=1573052&view=diff ============================================================================== --- hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/NameNodeRpcServer.java (original) +++ hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/NameNodeRpcServer.java Fri Feb 28 19:48:31 2014 @@ -138,6 +138,9 @@ import org.apache.hadoop.security.protoc import org.apache.hadoop.security.protocolPB.RefreshAuthorizationPolicyProtocolServerSideTranslatorPB; import org.apache.hadoop.security.protocolPB.RefreshUserMappingsProtocolPB; import org.apache.hadoop.security.protocolPB.RefreshUserMappingsProtocolServerSideTranslatorPB; +import org.apache.hadoop.ipc.protocolPB.RefreshCallQueueProtocolPB; +import org.apache.hadoop.ipc.protocolPB.RefreshCallQueueProtocolServerSideTranslatorPB; +import org.apache.hadoop.ipc.proto.RefreshCallQueueProtocolProtos.RefreshCallQueueProtocolService; import org.apache.hadoop.security.token.SecretManager.InvalidToken; import org.apache.hadoop.security.token.Token; import org.apache.hadoop.tools.proto.GetUserMappingsProtocolProtos.GetUserMappingsProtocolService; @@ -215,6 +218,11 @@ class NameNodeRpcServer implements Namen BlockingService refreshUserMappingService = RefreshUserMappingsProtocolService .newReflectiveBlockingService(refreshUserMappingXlator); + RefreshCallQueueProtocolServerSideTranslatorPB refreshCallQueueXlator = + new RefreshCallQueueProtocolServerSideTranslatorPB(this); + BlockingService refreshCallQueueService = RefreshCallQueueProtocolService + .newReflectiveBlockingService(refreshCallQueueXlator); + GetUserMappingsProtocolServerSideTranslatorPB getUserMappingXlator = new GetUserMappingsProtocolServerSideTranslatorPB(this); BlockingService getUserMappingService = GetUserMappingsProtocolService @@ -261,6 +269,9 @@ class NameNodeRpcServer implements Namen refreshAuthService, serviceRpcServer); DFSUtil.addPBProtocol(conf, RefreshUserMappingsProtocolPB.class, refreshUserMappingService, serviceRpcServer); + // We support Refreshing call queue here in case the client RPC queue is full + DFSUtil.addPBProtocol(conf, RefreshCallQueueProtocolPB.class, + refreshCallQueueService, serviceRpcServer); DFSUtil.addPBProtocol(conf, GetUserMappingsProtocolPB.class, getUserMappingService, serviceRpcServer); @@ -303,6 +314,8 @@ class NameNodeRpcServer implements Namen refreshAuthService, clientRpcServer); DFSUtil.addPBProtocol(conf, RefreshUserMappingsProtocolPB.class, refreshUserMappingService, clientRpcServer); + DFSUtil.addPBProtocol(conf, RefreshCallQueueProtocolPB.class, + refreshCallQueueService, clientRpcServer); DFSUtil.addPBProtocol(conf, GetUserMappingsProtocolPB.class, getUserMappingService, clientRpcServer); @@ -1095,6 +1108,17 @@ class NameNodeRpcServer implements Namen ProxyUsers.refreshSuperUserGroupsConfiguration(); } + + @Override // RefreshCallQueueProtocol + public void refreshCallQueue() { + LOG.info("Refreshing call queue."); + + Configuration conf = new Configuration(); + clientRpcServer.refreshCallQueue(conf); + if (this.serviceRpcServer != null) { + serviceRpcServer.refreshCallQueue(conf); + } + } @Override // GetUserMappingsProtocol public String[] getGroupsForUser(String user) throws IOException { Modified: hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/protocol/NamenodeProtocols.java URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/protocol/NamenodeProtocols.java?rev=1573052&r1=1573051&r2=1573052&view=diff ============================================================================== --- hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/protocol/NamenodeProtocols.java (original) +++ hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/protocol/NamenodeProtocols.java Fri Feb 28 19:48:31 2014 @@ -23,6 +23,7 @@ import org.apache.hadoop.ha.HAServicePro import org.apache.hadoop.hdfs.protocol.ClientProtocol; import org.apache.hadoop.security.authorize.RefreshAuthorizationPolicyProtocol; import org.apache.hadoop.security.RefreshUserMappingsProtocol; +import org.apache.hadoop.ipc.RefreshCallQueueProtocol; import org.apache.hadoop.tools.GetUserMappingsProtocol; /** The full set of RPC methods implemented by the Namenode. */ @@ -33,6 +34,7 @@ public interface NamenodeProtocols NamenodeProtocol, RefreshAuthorizationPolicyProtocol, RefreshUserMappingsProtocol, + RefreshCallQueueProtocol, GetUserMappingsProtocol, HAServiceProtocol { } Modified: hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/tools/DFSAdmin.java URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/tools/DFSAdmin.java?rev=1573052&r1=1573051&r2=1573052&view=diff ============================================================================== --- hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/tools/DFSAdmin.java (original) +++ hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/tools/DFSAdmin.java Fri Feb 28 19:48:31 2014 @@ -62,6 +62,7 @@ import org.apache.hadoop.security.Refres import org.apache.hadoop.security.SecurityUtil; import org.apache.hadoop.security.UserGroupInformation; import org.apache.hadoop.security.authorize.RefreshAuthorizationPolicyProtocol; +import org.apache.hadoop.ipc.RefreshCallQueueProtocol; import org.apache.hadoop.util.StringUtils; import org.apache.hadoop.util.ToolRunner; @@ -580,6 +581,7 @@ public class DFSAdmin extends FsShell { "\t[-refreshServiceAcl]\n" + "\t[-refreshUserToGroupsMappings]\n" + "\t[refreshSuperUserGroupsConfiguration]\n" + + "\t[-refreshCallQueue]\n" + "\t[-printTopology]\n" + "\t[-refreshNamenodes datanodehost:port]\n"+ "\t[-deleteBlockPool datanodehost:port blockpoolId [force]]\n"+ @@ -649,6 +651,8 @@ public class DFSAdmin extends FsShell { String refreshSuperUserGroupsConfiguration = "-refreshSuperUserGroupsConfiguration: Refresh superuser proxy groups mappings\n"; + String refreshCallQueue = "-refreshCallQueue: Reload the call queue from config\n"; + String printTopology = "-printTopology: Print a tree of the racks and their\n" + "\t\tnodes as reported by the Namenode\n"; @@ -717,6 +721,8 @@ public class DFSAdmin extends FsShell { System.out.println(refreshUserToGroupsMappings); } else if ("refreshSuperUserGroupsConfiguration".equals(cmd)) { System.out.println(refreshSuperUserGroupsConfiguration); + } else if ("refreshCallQueue".equals(cmd)) { + System.out.println(refreshCallQueue); } else if ("printTopology".equals(cmd)) { System.out.println(printTopology); } else if ("refreshNamenodes".equals(cmd)) { @@ -750,6 +756,7 @@ public class DFSAdmin extends FsShell { System.out.println(refreshServiceAcl); System.out.println(refreshUserToGroupsMappings); System.out.println(refreshSuperUserGroupsConfiguration); + System.out.println(refreshCallQueue); System.out.println(printTopology); System.out.println(refreshNamenodes); System.out.println(deleteBlockPool); @@ -939,6 +946,27 @@ public class DFSAdmin extends FsShell { return 0; } + public int refreshCallQueue() throws IOException { + // Get the current configuration + Configuration conf = getConf(); + + // for security authorization + // server principal for this call + // should be NN's one. + conf.set(CommonConfigurationKeys.HADOOP_SECURITY_SERVICE_USER_NAME_KEY, + conf.get(DFSConfigKeys.DFS_NAMENODE_USER_NAME_KEY, "")); + + // Create the client + RefreshCallQueueProtocol refreshProtocol = + NameNodeProxies.createProxy(conf, FileSystem.getDefaultUri(conf), + RefreshCallQueueProtocol.class).getProxy(); + + // Refresh the user-to-groups mappings + refreshProtocol.refreshCallQueue(); + + return 0; + } + /** * Displays format of commands. * @param cmd The command that is being executed. @@ -995,6 +1023,9 @@ public class DFSAdmin extends FsShell { } else if ("-refreshSuperUserGroupsConfiguration".equals(cmd)) { System.err.println("Usage: java DFSAdmin" + " [-refreshSuperUserGroupsConfiguration]"); + } else if ("-refreshCallQueue".equals(cmd)) { + System.err.println("Usage: java DFSAdmin" + + " [-refreshCallQueue]"); } else if ("-printTopology".equals(cmd)) { System.err.println("Usage: java DFSAdmin" + " [-printTopology]"); @@ -1026,6 +1057,7 @@ public class DFSAdmin extends FsShell { System.err.println(" [-refreshServiceAcl]"); System.err.println(" [-refreshUserToGroupsMappings]"); System.err.println(" [-refreshSuperUserGroupsConfiguration]"); + System.err.println(" [-refreshCallQueue]"); System.err.println(" [-printTopology]"); System.err.println(" [-refreshNamenodes datanodehost:port]"); System.err.println(" [-deleteBlockPool datanode-host:port blockpoolId [force]]"); @@ -1197,6 +1229,8 @@ public class DFSAdmin extends FsShell { exitCode = refreshUserToGroupsMappings(); } else if ("-refreshSuperUserGroupsConfiguration".equals(cmd)) { exitCode = refreshSuperUserGroupsConfiguration(); + } else if ("-refreshCallQueue".equals(cmd)) { + exitCode = refreshCallQueue(); } else if ("-printTopology".equals(cmd)) { exitCode = printTopology(); } else if ("-refreshNamenodes".equals(cmd)) { Modified: hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestIsMethodSupported.java URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestIsMethodSupported.java?rev=1573052&r1=1573051&r2=1573052&view=diff ============================================================================== --- hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestIsMethodSupported.java (original) +++ hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestIsMethodSupported.java Fri Feb 28 19:48:31 2014 @@ -39,6 +39,8 @@ import org.apache.hadoop.security.UserGr import org.apache.hadoop.security.authorize.RefreshAuthorizationPolicyProtocol; import org.apache.hadoop.security.protocolPB.RefreshAuthorizationPolicyProtocolClientSideTranslatorPB; import org.apache.hadoop.security.protocolPB.RefreshUserMappingsProtocolClientSideTranslatorPB; +import org.apache.hadoop.ipc.protocolPB.RefreshCallQueueProtocolClientSideTranslatorPB; +import org.apache.hadoop.ipc.RefreshCallQueueProtocol; import org.apache.hadoop.tools.GetUserMappingsProtocol; import org.apache.hadoop.tools.protocolPB.GetUserMappingsProtocolClientSideTranslatorPB; import org.junit.AfterClass; @@ -169,4 +171,15 @@ public class TestIsMethodSupported { assertTrue( translator.isMethodSupported("refreshUserToGroupsMappings")); } + + @Test + public void testRefreshCallQueueProtocol() throws IOException { + RefreshCallQueueProtocolClientSideTranslatorPB translator = + (RefreshCallQueueProtocolClientSideTranslatorPB) + NameNodeProxies.createNonHAProxy(conf, nnAddress, + RefreshCallQueueProtocol.class, + UserGroupInformation.getCurrentUser(), true).getProxy(); + assertTrue( + translator.isMethodSupported("refreshCallQueue")); + } }