Author: arp
Date: Thu Jun 12 01:43:01 2014
New Revision: 1602057

URL: http://svn.apache.org/r1602057
Log:
HADOOP-10376: Merging r1602055 from trunk to branch-2.

Added:
    
hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/TestGenericRefresh.java
      - copied unchanged from r1602055, 
hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/TestGenericRefresh.java
Modified:
    
hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/HDFSPolicyProvider.java
    
hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/NameNodeRpcServer.java
    
hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/protocol/NamenodeProtocols.java
    
hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/tools/DFSAdmin.java

Modified: 
hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/HDFSPolicyProvider.java
URL: 
http://svn.apache.org/viewvc/hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/HDFSPolicyProvider.java?rev=1602057&r1=1602056&r2=1602057&view=diff
==============================================================================
--- 
hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/HDFSPolicyProvider.java
 (original)
+++ 
hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/HDFSPolicyProvider.java
 Thu Jun 12 01:43:01 2014
@@ -33,6 +33,7 @@ import org.apache.hadoop.security.author
 import org.apache.hadoop.security.authorize.Service;
 import org.apache.hadoop.tools.GetUserMappingsProtocol;
 import org.apache.hadoop.ipc.RefreshCallQueueProtocol;
+import org.apache.hadoop.ipc.GenericRefreshProtocol;
 
 /**
  * {@link PolicyProvider} for HDFS protocols.
@@ -68,7 +69,10 @@ public class HDFSPolicyProvider extends 
         GetUserMappingsProtocol.class),
     new Service(
         
CommonConfigurationKeys.HADOOP_SECURITY_SERVICE_AUTHORIZATION_REFRESH_CALLQUEUE,
-        RefreshCallQueueProtocol.class)
+        RefreshCallQueueProtocol.class),
+    new Service(
+        
CommonConfigurationKeys.HADOOP_SECURITY_SERVICE_AUTHORIZATION_GENERIC_REFRESH,
+        GenericRefreshProtocol.class)
   };
   
   @Override

Modified: 
hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/NameNodeRpcServer.java
URL: 
http://svn.apache.org/viewvc/hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/NameNodeRpcServer.java?rev=1602057&r1=1602056&r2=1602057&view=diff
==============================================================================
--- 
hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/NameNodeRpcServer.java
 (original)
+++ 
hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/NameNodeRpcServer.java
 Thu Jun 12 01:43:01 2014
@@ -132,6 +132,8 @@ import org.apache.hadoop.ipc.ProtobufRpc
 import org.apache.hadoop.ipc.RPC;
 import org.apache.hadoop.ipc.Server;
 import org.apache.hadoop.ipc.WritableRpcEngine;
+import org.apache.hadoop.ipc.RefreshRegistry;
+import org.apache.hadoop.ipc.RefreshResponse;
 import org.apache.hadoop.net.Node;
 import org.apache.hadoop.security.AccessControlException;
 import org.apache.hadoop.security.Groups;
@@ -147,6 +149,9 @@ import org.apache.hadoop.security.protoc
 import org.apache.hadoop.ipc.protocolPB.RefreshCallQueueProtocolPB;
 import 
org.apache.hadoop.ipc.protocolPB.RefreshCallQueueProtocolServerSideTranslatorPB;
 import 
org.apache.hadoop.ipc.proto.RefreshCallQueueProtocolProtos.RefreshCallQueueProtocolService;
+import org.apache.hadoop.ipc.protocolPB.GenericRefreshProtocolPB;
+import 
org.apache.hadoop.ipc.protocolPB.GenericRefreshProtocolServerSideTranslatorPB;
+import 
org.apache.hadoop.ipc.proto.GenericRefreshProtocolProtos.GenericRefreshProtocolService;
 import org.apache.hadoop.security.token.SecretManager.InvalidToken;
 import org.apache.hadoop.security.token.Token;
 import 
org.apache.hadoop.tools.proto.GetUserMappingsProtocolProtos.GetUserMappingsProtocolService;
@@ -229,6 +234,11 @@ class NameNodeRpcServer implements Namen
     BlockingService refreshCallQueueService = RefreshCallQueueProtocolService
         .newReflectiveBlockingService(refreshCallQueueXlator);
 
+    GenericRefreshProtocolServerSideTranslatorPB genericRefreshXlator =
+        new GenericRefreshProtocolServerSideTranslatorPB(this);
+    BlockingService genericRefreshService = GenericRefreshProtocolService
+        .newReflectiveBlockingService(genericRefreshXlator);
+
     GetUserMappingsProtocolServerSideTranslatorPB getUserMappingXlator = 
         new GetUserMappingsProtocolServerSideTranslatorPB(this);
     BlockingService getUserMappingService = GetUserMappingsProtocolService
@@ -277,6 +287,8 @@ class NameNodeRpcServer implements Namen
       // We support Refreshing call queue here in case the client RPC queue is 
full
       DFSUtil.addPBProtocol(conf, RefreshCallQueueProtocolPB.class,
           refreshCallQueueService, serviceRpcServer);
+      DFSUtil.addPBProtocol(conf, GenericRefreshProtocolPB.class,
+          genericRefreshService, serviceRpcServer);
       DFSUtil.addPBProtocol(conf, GetUserMappingsProtocolPB.class, 
           getUserMappingService, serviceRpcServer);
   
@@ -317,6 +329,8 @@ class NameNodeRpcServer implements Namen
         refreshUserMappingService, clientRpcServer);
     DFSUtil.addPBProtocol(conf, RefreshCallQueueProtocolPB.class,
         refreshCallQueueService, clientRpcServer);
+    DFSUtil.addPBProtocol(conf, GenericRefreshProtocolPB.class,
+        genericRefreshService, clientRpcServer);
     DFSUtil.addPBProtocol(conf, GetUserMappingsProtocolPB.class, 
         getUserMappingService, clientRpcServer);
 
@@ -1150,6 +1164,12 @@ class NameNodeRpcServer implements Namen
       serviceRpcServer.refreshCallQueue(conf);
     }
   }
+
+  @Override // GenericRefreshProtocol
+  public Collection<RefreshResponse> refresh(String identifier, String[] args) 
{
+    // Let the registry handle as needed
+    return RefreshRegistry.defaultRegistry().dispatch(identifier, args);
+  }
   
   @Override // GetUserMappingsProtocol
   public String[] getGroupsForUser(String user) throws IOException {

Modified: 
hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/protocol/NamenodeProtocols.java
URL: 
http://svn.apache.org/viewvc/hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/protocol/NamenodeProtocols.java?rev=1602057&r1=1602056&r2=1602057&view=diff
==============================================================================
--- 
hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/protocol/NamenodeProtocols.java
 (original)
+++ 
hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/protocol/NamenodeProtocols.java
 Thu Jun 12 01:43:01 2014
@@ -24,6 +24,7 @@ import org.apache.hadoop.hdfs.protocol.C
 import org.apache.hadoop.security.authorize.RefreshAuthorizationPolicyProtocol;
 import org.apache.hadoop.security.RefreshUserMappingsProtocol;
 import org.apache.hadoop.ipc.RefreshCallQueueProtocol;
+import org.apache.hadoop.ipc.GenericRefreshProtocol;
 import org.apache.hadoop.tools.GetUserMappingsProtocol;
 
 /** The full set of RPC methods implemented by the Namenode.  */
@@ -35,6 +36,7 @@ public interface NamenodeProtocols
           RefreshAuthorizationPolicyProtocol,
           RefreshUserMappingsProtocol,
           RefreshCallQueueProtocol,
+          GenericRefreshProtocol,
           GetUserMappingsProtocol,
           HAServiceProtocol {
 }

Modified: 
hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/tools/DFSAdmin.java
URL: 
http://svn.apache.org/viewvc/hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/tools/DFSAdmin.java?rev=1602057&r1=1602056&r2=1602057&view=diff
==============================================================================
--- 
hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/tools/DFSAdmin.java
 (original)
+++ 
hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/tools/DFSAdmin.java
 Thu Jun 12 01:43:01 2014
@@ -26,6 +26,7 @@ import java.net.URL;
 import java.security.PrivilegedExceptionAction;
 import java.util.ArrayList;
 import java.util.Arrays;
+import java.util.Collection;
 import java.util.Collections;
 import java.util.HashMap;
 import java.util.List;
@@ -62,12 +63,17 @@ import org.apache.hadoop.hdfs.server.nam
 import org.apache.hadoop.hdfs.server.namenode.TransferFsImage;
 import org.apache.hadoop.ipc.RPC;
 import org.apache.hadoop.ipc.RemoteException;
+import org.apache.hadoop.ipc.ProtobufRpcEngine;
+import org.apache.hadoop.ipc.RefreshCallQueueProtocol;
+import org.apache.hadoop.ipc.GenericRefreshProtocol;
+import org.apache.hadoop.ipc.RefreshResponse;
 import org.apache.hadoop.net.NetUtils;
 import org.apache.hadoop.security.RefreshUserMappingsProtocol;
 import org.apache.hadoop.security.SecurityUtil;
 import org.apache.hadoop.security.UserGroupInformation;
 import org.apache.hadoop.security.authorize.RefreshAuthorizationPolicyProtocol;
-import org.apache.hadoop.ipc.RefreshCallQueueProtocol;
+import 
org.apache.hadoop.ipc.protocolPB.GenericRefreshProtocolClientSideTranslatorPB;
+import org.apache.hadoop.ipc.protocolPB.GenericRefreshProtocolPB;
 import org.apache.hadoop.util.StringUtils;
 import org.apache.hadoop.util.ToolRunner;
 
@@ -688,6 +694,7 @@ public class DFSAdmin extends FsShell {
       "\t[-refreshUserToGroupsMappings]\n" +
       "\t[-refreshSuperUserGroupsConfiguration]\n" +
       "\t[-refreshCallQueue]\n" +
+      "\t[-refresh <host:ipc_port> <key> [arg1..argn]\n" +
       "\t[-printTopology]\n" +
       "\t[-refreshNamenodes datanodehost:port]\n"+
       "\t[-deleteBlockPool datanodehost:port blockpoolId [force]]\n"+
@@ -764,6 +771,10 @@ public class DFSAdmin extends FsShell {
 
     String refreshCallQueue = "-refreshCallQueue: Reload the call queue from 
config\n";
 
+    String genericRefresh = "-refresh: Arguments are <hostname:port> 
<resource_identifier> [arg1..argn]\n" +
+      "\tTriggers a runtime-refresh of the resource specified by 
<resource_identifier>\n" +
+      "\ton <hostname:port>. All other args after are sent to the host.";
+
     String printTopology = "-printTopology: Print a tree of the racks and 
their\n" +
                            "\t\tnodes as reported by the Namenode\n";
     
@@ -848,6 +859,8 @@ public class DFSAdmin extends FsShell {
       System.out.println(refreshSuperUserGroupsConfiguration);
     } else if ("refreshCallQueue".equals(cmd)) {
       System.out.println(refreshCallQueue);
+    } else if ("refresh".equals(cmd)) {
+      System.out.println(genericRefresh);
     } else if ("printTopology".equals(cmd)) {
       System.out.println(printTopology);
     } else if ("refreshNamenodes".equals(cmd)) {
@@ -887,6 +900,7 @@ public class DFSAdmin extends FsShell {
       System.out.println(refreshUserToGroupsMappings);
       System.out.println(refreshSuperUserGroupsConfiguration);
       System.out.println(refreshCallQueue);
+      System.out.println(genericRefresh);
       System.out.println(printTopology);
       System.out.println(refreshNamenodes);
       System.out.println(deleteBlockPool);
@@ -1100,6 +1114,56 @@ public class DFSAdmin extends FsShell {
     return 0;
   }
 
+  public int genericRefresh(String[] argv, int i) throws IOException {
+    String hostport = argv[i++];
+    String identifier = argv[i++];
+    String[] args = Arrays.copyOfRange(argv, i, argv.length);
+
+    // Get the current configuration
+    Configuration conf = getConf();
+
+    // for security authorization
+    // server principal for this call
+    // should be NN's one.
+    conf.set(CommonConfigurationKeys.HADOOP_SECURITY_SERVICE_USER_NAME_KEY,
+      conf.get(DFSConfigKeys.DFS_NAMENODE_KERBEROS_PRINCIPAL_KEY, ""));
+
+    // Create the client
+    Class<?> xface = GenericRefreshProtocolPB.class;
+    InetSocketAddress address = NetUtils.createSocketAddr(hostport);
+    UserGroupInformation ugi = UserGroupInformation.getCurrentUser();
+
+    RPC.setProtocolEngine(conf, xface, ProtobufRpcEngine.class);
+    GenericRefreshProtocolPB proxy = (GenericRefreshProtocolPB)
+      RPC.getProxy(xface, RPC.getProtocolVersion(xface), address,
+        ugi, conf, NetUtils.getDefaultSocketFactory(conf), 0);
+
+    GenericRefreshProtocol xlator =
+      new GenericRefreshProtocolClientSideTranslatorPB(proxy);
+
+    // Refresh
+    Collection<RefreshResponse> responses = xlator.refresh(identifier, args);
+
+    int returnCode = 0;
+
+    // Print refresh responses
+    System.out.println("Refresh Responses:\n");
+    for (RefreshResponse response : responses) {
+      System.out.println(response.toString());
+
+      if (returnCode == 0 && response.getReturnCode() != 0) {
+        // This is the first non-zero return code, so we should return this
+        returnCode = response.getReturnCode();
+      } else if (returnCode != 0 && response.getReturnCode() != 0) {
+        // Then now we have multiple non-zero return codes,
+        // so we merge them into -1
+        returnCode = -1;
+      }
+    }
+
+    return returnCode;
+  }
+
   /**
    * Displays format of commands.
    * @param cmd The command that is being executed.
@@ -1162,6 +1226,9 @@ public class DFSAdmin extends FsShell {
     } else if ("-refreshCallQueue".equals(cmd)) {
       System.err.println("Usage: java DFSAdmin"
                          + " [-refreshCallQueue]");
+    } else if ("-refresh".equals(cmd)) {
+      System.err.println("Usage: java DFSAdmin"
+                         + " [-refresh <hostname:port> <resource_identifier> 
[arg1..argn]");
     } else if ("-printTopology".equals(cmd)) {
       System.err.println("Usage: java DFSAdmin"
                          + " [-printTopology]");
@@ -1195,6 +1262,7 @@ public class DFSAdmin extends FsShell {
       System.err.println("           [-refreshUserToGroupsMappings]");
       System.err.println("           [-refreshSuperUserGroupsConfiguration]");
       System.err.println("           [-refreshCallQueue]");
+      System.err.println("           [-refresh]");
       System.err.println("           [-printTopology]");
       System.err.println("           [-refreshNamenodes datanodehost:port]");
       System.err.println("           [-deleteBlockPool datanode-host:port 
blockpoolId [force]]");
@@ -1292,6 +1360,11 @@ public class DFSAdmin extends FsShell {
         printUsage(cmd);
         return exitCode;
       }
+    } else if ("-refresh".equals(cmd)) {
+      if (argv.length < 3) {
+        printUsage(cmd);
+        return exitCode;
+      }
     } else if ("-refreshUserToGroupsMappings".equals(cmd)) {
       if (argv.length != 1) {
         printUsage(cmd);
@@ -1387,6 +1460,8 @@ public class DFSAdmin extends FsShell {
         exitCode = refreshSuperUserGroupsConfiguration();
       } else if ("-refreshCallQueue".equals(cmd)) {
         exitCode = refreshCallQueue();
+      } else if ("-refresh".equals(cmd)) {
+        exitCode = genericRefresh(argv, i);
       } else if ("-printTopology".equals(cmd)) {
         exitCode = printTopology();
       } else if ("-refreshNamenodes".equals(cmd)) {


Reply via email to