Author: arp Date: Thu Jun 12 01:43:01 2014 New Revision: 1602057 URL: http://svn.apache.org/r1602057 Log: HADOOP-10376: Merging r1602055 from trunk to branch-2.
Added: hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/TestGenericRefresh.java - copied unchanged from r1602055, hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/TestGenericRefresh.java Modified: hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/HDFSPolicyProvider.java hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/NameNodeRpcServer.java hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/protocol/NamenodeProtocols.java hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/tools/DFSAdmin.java Modified: hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/HDFSPolicyProvider.java URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/HDFSPolicyProvider.java?rev=1602057&r1=1602056&r2=1602057&view=diff ============================================================================== --- hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/HDFSPolicyProvider.java (original) +++ hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/HDFSPolicyProvider.java Thu Jun 12 01:43:01 2014 @@ -33,6 +33,7 @@ import org.apache.hadoop.security.author import org.apache.hadoop.security.authorize.Service; import org.apache.hadoop.tools.GetUserMappingsProtocol; import org.apache.hadoop.ipc.RefreshCallQueueProtocol; +import org.apache.hadoop.ipc.GenericRefreshProtocol; /** * {@link PolicyProvider} for HDFS protocols. @@ -68,7 +69,10 @@ public class HDFSPolicyProvider extends GetUserMappingsProtocol.class), new Service( CommonConfigurationKeys.HADOOP_SECURITY_SERVICE_AUTHORIZATION_REFRESH_CALLQUEUE, - RefreshCallQueueProtocol.class) + RefreshCallQueueProtocol.class), + new Service( + CommonConfigurationKeys.HADOOP_SECURITY_SERVICE_AUTHORIZATION_GENERIC_REFRESH, + GenericRefreshProtocol.class) }; @Override Modified: hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/NameNodeRpcServer.java URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/NameNodeRpcServer.java?rev=1602057&r1=1602056&r2=1602057&view=diff ============================================================================== --- hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/NameNodeRpcServer.java (original) +++ hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/NameNodeRpcServer.java Thu Jun 12 01:43:01 2014 @@ -132,6 +132,8 @@ import org.apache.hadoop.ipc.ProtobufRpc import org.apache.hadoop.ipc.RPC; import org.apache.hadoop.ipc.Server; import org.apache.hadoop.ipc.WritableRpcEngine; +import org.apache.hadoop.ipc.RefreshRegistry; +import org.apache.hadoop.ipc.RefreshResponse; import org.apache.hadoop.net.Node; import org.apache.hadoop.security.AccessControlException; import org.apache.hadoop.security.Groups; @@ -147,6 +149,9 @@ import org.apache.hadoop.security.protoc import org.apache.hadoop.ipc.protocolPB.RefreshCallQueueProtocolPB; import org.apache.hadoop.ipc.protocolPB.RefreshCallQueueProtocolServerSideTranslatorPB; import org.apache.hadoop.ipc.proto.RefreshCallQueueProtocolProtos.RefreshCallQueueProtocolService; +import org.apache.hadoop.ipc.protocolPB.GenericRefreshProtocolPB; +import org.apache.hadoop.ipc.protocolPB.GenericRefreshProtocolServerSideTranslatorPB; +import org.apache.hadoop.ipc.proto.GenericRefreshProtocolProtos.GenericRefreshProtocolService; import org.apache.hadoop.security.token.SecretManager.InvalidToken; import org.apache.hadoop.security.token.Token; import org.apache.hadoop.tools.proto.GetUserMappingsProtocolProtos.GetUserMappingsProtocolService; @@ -229,6 +234,11 @@ class NameNodeRpcServer implements Namen BlockingService refreshCallQueueService = RefreshCallQueueProtocolService .newReflectiveBlockingService(refreshCallQueueXlator); + GenericRefreshProtocolServerSideTranslatorPB genericRefreshXlator = + new GenericRefreshProtocolServerSideTranslatorPB(this); + BlockingService genericRefreshService = GenericRefreshProtocolService + .newReflectiveBlockingService(genericRefreshXlator); + GetUserMappingsProtocolServerSideTranslatorPB getUserMappingXlator = new GetUserMappingsProtocolServerSideTranslatorPB(this); BlockingService getUserMappingService = GetUserMappingsProtocolService @@ -277,6 +287,8 @@ class NameNodeRpcServer implements Namen // We support Refreshing call queue here in case the client RPC queue is full DFSUtil.addPBProtocol(conf, RefreshCallQueueProtocolPB.class, refreshCallQueueService, serviceRpcServer); + DFSUtil.addPBProtocol(conf, GenericRefreshProtocolPB.class, + genericRefreshService, serviceRpcServer); DFSUtil.addPBProtocol(conf, GetUserMappingsProtocolPB.class, getUserMappingService, serviceRpcServer); @@ -317,6 +329,8 @@ class NameNodeRpcServer implements Namen refreshUserMappingService, clientRpcServer); DFSUtil.addPBProtocol(conf, RefreshCallQueueProtocolPB.class, refreshCallQueueService, clientRpcServer); + DFSUtil.addPBProtocol(conf, GenericRefreshProtocolPB.class, + genericRefreshService, clientRpcServer); DFSUtil.addPBProtocol(conf, GetUserMappingsProtocolPB.class, getUserMappingService, clientRpcServer); @@ -1150,6 +1164,12 @@ class NameNodeRpcServer implements Namen serviceRpcServer.refreshCallQueue(conf); } } + + @Override // GenericRefreshProtocol + public Collection<RefreshResponse> refresh(String identifier, String[] args) { + // Let the registry handle as needed + return RefreshRegistry.defaultRegistry().dispatch(identifier, args); + } @Override // GetUserMappingsProtocol public String[] getGroupsForUser(String user) throws IOException { Modified: hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/protocol/NamenodeProtocols.java URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/protocol/NamenodeProtocols.java?rev=1602057&r1=1602056&r2=1602057&view=diff ============================================================================== --- hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/protocol/NamenodeProtocols.java (original) +++ hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/protocol/NamenodeProtocols.java Thu Jun 12 01:43:01 2014 @@ -24,6 +24,7 @@ import org.apache.hadoop.hdfs.protocol.C import org.apache.hadoop.security.authorize.RefreshAuthorizationPolicyProtocol; import org.apache.hadoop.security.RefreshUserMappingsProtocol; import org.apache.hadoop.ipc.RefreshCallQueueProtocol; +import org.apache.hadoop.ipc.GenericRefreshProtocol; import org.apache.hadoop.tools.GetUserMappingsProtocol; /** The full set of RPC methods implemented by the Namenode. */ @@ -35,6 +36,7 @@ public interface NamenodeProtocols RefreshAuthorizationPolicyProtocol, RefreshUserMappingsProtocol, RefreshCallQueueProtocol, + GenericRefreshProtocol, GetUserMappingsProtocol, HAServiceProtocol { } Modified: hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/tools/DFSAdmin.java URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/tools/DFSAdmin.java?rev=1602057&r1=1602056&r2=1602057&view=diff ============================================================================== --- hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/tools/DFSAdmin.java (original) +++ hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/tools/DFSAdmin.java Thu Jun 12 01:43:01 2014 @@ -26,6 +26,7 @@ import java.net.URL; import java.security.PrivilegedExceptionAction; import java.util.ArrayList; import java.util.Arrays; +import java.util.Collection; import java.util.Collections; import java.util.HashMap; import java.util.List; @@ -62,12 +63,17 @@ import org.apache.hadoop.hdfs.server.nam import org.apache.hadoop.hdfs.server.namenode.TransferFsImage; import org.apache.hadoop.ipc.RPC; import org.apache.hadoop.ipc.RemoteException; +import org.apache.hadoop.ipc.ProtobufRpcEngine; +import org.apache.hadoop.ipc.RefreshCallQueueProtocol; +import org.apache.hadoop.ipc.GenericRefreshProtocol; +import org.apache.hadoop.ipc.RefreshResponse; import org.apache.hadoop.net.NetUtils; import org.apache.hadoop.security.RefreshUserMappingsProtocol; import org.apache.hadoop.security.SecurityUtil; import org.apache.hadoop.security.UserGroupInformation; import org.apache.hadoop.security.authorize.RefreshAuthorizationPolicyProtocol; -import org.apache.hadoop.ipc.RefreshCallQueueProtocol; +import org.apache.hadoop.ipc.protocolPB.GenericRefreshProtocolClientSideTranslatorPB; +import org.apache.hadoop.ipc.protocolPB.GenericRefreshProtocolPB; import org.apache.hadoop.util.StringUtils; import org.apache.hadoop.util.ToolRunner; @@ -688,6 +694,7 @@ public class DFSAdmin extends FsShell { "\t[-refreshUserToGroupsMappings]\n" + "\t[-refreshSuperUserGroupsConfiguration]\n" + "\t[-refreshCallQueue]\n" + + "\t[-refresh <host:ipc_port> <key> [arg1..argn]\n" + "\t[-printTopology]\n" + "\t[-refreshNamenodes datanodehost:port]\n"+ "\t[-deleteBlockPool datanodehost:port blockpoolId [force]]\n"+ @@ -764,6 +771,10 @@ public class DFSAdmin extends FsShell { String refreshCallQueue = "-refreshCallQueue: Reload the call queue from config\n"; + String genericRefresh = "-refresh: Arguments are <hostname:port> <resource_identifier> [arg1..argn]\n" + + "\tTriggers a runtime-refresh of the resource specified by <resource_identifier>\n" + + "\ton <hostname:port>. All other args after are sent to the host."; + String printTopology = "-printTopology: Print a tree of the racks and their\n" + "\t\tnodes as reported by the Namenode\n"; @@ -848,6 +859,8 @@ public class DFSAdmin extends FsShell { System.out.println(refreshSuperUserGroupsConfiguration); } else if ("refreshCallQueue".equals(cmd)) { System.out.println(refreshCallQueue); + } else if ("refresh".equals(cmd)) { + System.out.println(genericRefresh); } else if ("printTopology".equals(cmd)) { System.out.println(printTopology); } else if ("refreshNamenodes".equals(cmd)) { @@ -887,6 +900,7 @@ public class DFSAdmin extends FsShell { System.out.println(refreshUserToGroupsMappings); System.out.println(refreshSuperUserGroupsConfiguration); System.out.println(refreshCallQueue); + System.out.println(genericRefresh); System.out.println(printTopology); System.out.println(refreshNamenodes); System.out.println(deleteBlockPool); @@ -1100,6 +1114,56 @@ public class DFSAdmin extends FsShell { return 0; } + public int genericRefresh(String[] argv, int i) throws IOException { + String hostport = argv[i++]; + String identifier = argv[i++]; + String[] args = Arrays.copyOfRange(argv, i, argv.length); + + // Get the current configuration + Configuration conf = getConf(); + + // for security authorization + // server principal for this call + // should be NN's one. + conf.set(CommonConfigurationKeys.HADOOP_SECURITY_SERVICE_USER_NAME_KEY, + conf.get(DFSConfigKeys.DFS_NAMENODE_KERBEROS_PRINCIPAL_KEY, "")); + + // Create the client + Class<?> xface = GenericRefreshProtocolPB.class; + InetSocketAddress address = NetUtils.createSocketAddr(hostport); + UserGroupInformation ugi = UserGroupInformation.getCurrentUser(); + + RPC.setProtocolEngine(conf, xface, ProtobufRpcEngine.class); + GenericRefreshProtocolPB proxy = (GenericRefreshProtocolPB) + RPC.getProxy(xface, RPC.getProtocolVersion(xface), address, + ugi, conf, NetUtils.getDefaultSocketFactory(conf), 0); + + GenericRefreshProtocol xlator = + new GenericRefreshProtocolClientSideTranslatorPB(proxy); + + // Refresh + Collection<RefreshResponse> responses = xlator.refresh(identifier, args); + + int returnCode = 0; + + // Print refresh responses + System.out.println("Refresh Responses:\n"); + for (RefreshResponse response : responses) { + System.out.println(response.toString()); + + if (returnCode == 0 && response.getReturnCode() != 0) { + // This is the first non-zero return code, so we should return this + returnCode = response.getReturnCode(); + } else if (returnCode != 0 && response.getReturnCode() != 0) { + // Then now we have multiple non-zero return codes, + // so we merge them into -1 + returnCode = -1; + } + } + + return returnCode; + } + /** * Displays format of commands. * @param cmd The command that is being executed. @@ -1162,6 +1226,9 @@ public class DFSAdmin extends FsShell { } else if ("-refreshCallQueue".equals(cmd)) { System.err.println("Usage: java DFSAdmin" + " [-refreshCallQueue]"); + } else if ("-refresh".equals(cmd)) { + System.err.println("Usage: java DFSAdmin" + + " [-refresh <hostname:port> <resource_identifier> [arg1..argn]"); } else if ("-printTopology".equals(cmd)) { System.err.println("Usage: java DFSAdmin" + " [-printTopology]"); @@ -1195,6 +1262,7 @@ public class DFSAdmin extends FsShell { System.err.println(" [-refreshUserToGroupsMappings]"); System.err.println(" [-refreshSuperUserGroupsConfiguration]"); System.err.println(" [-refreshCallQueue]"); + System.err.println(" [-refresh]"); System.err.println(" [-printTopology]"); System.err.println(" [-refreshNamenodes datanodehost:port]"); System.err.println(" [-deleteBlockPool datanode-host:port blockpoolId [force]]"); @@ -1292,6 +1360,11 @@ public class DFSAdmin extends FsShell { printUsage(cmd); return exitCode; } + } else if ("-refresh".equals(cmd)) { + if (argv.length < 3) { + printUsage(cmd); + return exitCode; + } } else if ("-refreshUserToGroupsMappings".equals(cmd)) { if (argv.length != 1) { printUsage(cmd); @@ -1387,6 +1460,8 @@ public class DFSAdmin extends FsShell { exitCode = refreshSuperUserGroupsConfiguration(); } else if ("-refreshCallQueue".equals(cmd)) { exitCode = refreshCallQueue(); + } else if ("-refresh".equals(cmd)) { + exitCode = genericRefresh(argv, i); } else if ("-printTopology".equals(cmd)) { exitCode = printTopology(); } else if ("-refreshNamenodes".equals(cmd)) {