Author: arp Date: Thu Jun 12 01:27:39 2014 New Revision: 1602055 URL: http://svn.apache.org/r1602055 Log: HADOOP-10376. Refactor refresh*Protocols into a single generic refreshConfigProtocol. (Contributed by Chris Li)
Added: hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/TestGenericRefresh.java Modified: hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/HDFSPolicyProvider.java hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/NameNodeRpcServer.java hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/protocol/NamenodeProtocols.java hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/tools/DFSAdmin.java Modified: hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/HDFSPolicyProvider.java URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/HDFSPolicyProvider.java?rev=1602055&r1=1602054&r2=1602055&view=diff ============================================================================== --- hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/HDFSPolicyProvider.java (original) +++ hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/HDFSPolicyProvider.java Thu Jun 12 01:27:39 2014 @@ -33,6 +33,7 @@ import org.apache.hadoop.security.author import org.apache.hadoop.security.authorize.Service; import org.apache.hadoop.tools.GetUserMappingsProtocol; import org.apache.hadoop.ipc.RefreshCallQueueProtocol; +import org.apache.hadoop.ipc.GenericRefreshProtocol; /** * {@link PolicyProvider} for HDFS protocols. @@ -68,7 +69,10 @@ public class HDFSPolicyProvider extends GetUserMappingsProtocol.class), new Service( CommonConfigurationKeys.HADOOP_SECURITY_SERVICE_AUTHORIZATION_REFRESH_CALLQUEUE, - RefreshCallQueueProtocol.class) + RefreshCallQueueProtocol.class), + new Service( + CommonConfigurationKeys.HADOOP_SECURITY_SERVICE_AUTHORIZATION_GENERIC_REFRESH, + GenericRefreshProtocol.class) }; @Override Modified: hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/NameNodeRpcServer.java URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/NameNodeRpcServer.java?rev=1602055&r1=1602054&r2=1602055&view=diff ============================================================================== --- hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/NameNodeRpcServer.java (original) +++ hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/NameNodeRpcServer.java Thu Jun 12 01:27:39 2014 @@ -132,6 +132,8 @@ import org.apache.hadoop.ipc.ProtobufRpc import org.apache.hadoop.ipc.RPC; import org.apache.hadoop.ipc.Server; import org.apache.hadoop.ipc.WritableRpcEngine; +import org.apache.hadoop.ipc.RefreshRegistry; +import org.apache.hadoop.ipc.RefreshResponse; import org.apache.hadoop.net.Node; import org.apache.hadoop.security.AccessControlException; import org.apache.hadoop.security.Groups; @@ -147,6 +149,9 @@ import org.apache.hadoop.security.protoc import org.apache.hadoop.ipc.protocolPB.RefreshCallQueueProtocolPB; import org.apache.hadoop.ipc.protocolPB.RefreshCallQueueProtocolServerSideTranslatorPB; import org.apache.hadoop.ipc.proto.RefreshCallQueueProtocolProtos.RefreshCallQueueProtocolService; +import org.apache.hadoop.ipc.protocolPB.GenericRefreshProtocolPB; +import org.apache.hadoop.ipc.protocolPB.GenericRefreshProtocolServerSideTranslatorPB; +import org.apache.hadoop.ipc.proto.GenericRefreshProtocolProtos.GenericRefreshProtocolService; import org.apache.hadoop.security.token.SecretManager.InvalidToken; import org.apache.hadoop.security.token.Token; import org.apache.hadoop.tools.proto.GetUserMappingsProtocolProtos.GetUserMappingsProtocolService; @@ -229,6 +234,11 @@ class NameNodeRpcServer implements Namen BlockingService refreshCallQueueService = RefreshCallQueueProtocolService .newReflectiveBlockingService(refreshCallQueueXlator); + GenericRefreshProtocolServerSideTranslatorPB genericRefreshXlator = + new GenericRefreshProtocolServerSideTranslatorPB(this); + BlockingService genericRefreshService = GenericRefreshProtocolService + .newReflectiveBlockingService(genericRefreshXlator); + GetUserMappingsProtocolServerSideTranslatorPB getUserMappingXlator = new GetUserMappingsProtocolServerSideTranslatorPB(this); BlockingService getUserMappingService = GetUserMappingsProtocolService @@ -278,6 +288,8 @@ class NameNodeRpcServer implements Namen // We support Refreshing call queue here in case the client RPC queue is full DFSUtil.addPBProtocol(conf, RefreshCallQueueProtocolPB.class, refreshCallQueueService, serviceRpcServer); + DFSUtil.addPBProtocol(conf, GenericRefreshProtocolPB.class, + genericRefreshService, serviceRpcServer); DFSUtil.addPBProtocol(conf, GetUserMappingsProtocolPB.class, getUserMappingService, serviceRpcServer); @@ -322,6 +334,8 @@ class NameNodeRpcServer implements Namen refreshUserMappingService, clientRpcServer); DFSUtil.addPBProtocol(conf, RefreshCallQueueProtocolPB.class, refreshCallQueueService, clientRpcServer); + DFSUtil.addPBProtocol(conf, GenericRefreshProtocolPB.class, + genericRefreshService, clientRpcServer); DFSUtil.addPBProtocol(conf, GetUserMappingsProtocolPB.class, getUserMappingService, clientRpcServer); @@ -1154,6 +1168,12 @@ class NameNodeRpcServer implements Namen serviceRpcServer.refreshCallQueue(conf); } } + + @Override // GenericRefreshProtocol + public Collection<RefreshResponse> refresh(String identifier, String[] args) { + // Let the registry handle as needed + return RefreshRegistry.defaultRegistry().dispatch(identifier, args); + } @Override // GetUserMappingsProtocol public String[] getGroupsForUser(String user) throws IOException { Modified: hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/protocol/NamenodeProtocols.java URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/protocol/NamenodeProtocols.java?rev=1602055&r1=1602054&r2=1602055&view=diff ============================================================================== --- hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/protocol/NamenodeProtocols.java (original) +++ hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/protocol/NamenodeProtocols.java Thu Jun 12 01:27:39 2014 @@ -24,6 +24,7 @@ import org.apache.hadoop.hdfs.protocol.C import org.apache.hadoop.security.authorize.RefreshAuthorizationPolicyProtocol; import org.apache.hadoop.security.RefreshUserMappingsProtocol; import org.apache.hadoop.ipc.RefreshCallQueueProtocol; +import org.apache.hadoop.ipc.GenericRefreshProtocol; import org.apache.hadoop.tools.GetUserMappingsProtocol; /** The full set of RPC methods implemented by the Namenode. */ @@ -35,6 +36,7 @@ public interface NamenodeProtocols RefreshAuthorizationPolicyProtocol, RefreshUserMappingsProtocol, RefreshCallQueueProtocol, + GenericRefreshProtocol, GetUserMappingsProtocol, HAServiceProtocol { } Modified: hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/tools/DFSAdmin.java URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/tools/DFSAdmin.java?rev=1602055&r1=1602054&r2=1602055&view=diff ============================================================================== --- hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/tools/DFSAdmin.java (original) +++ hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/tools/DFSAdmin.java Thu Jun 12 01:27:39 2014 @@ -26,6 +26,7 @@ import java.net.URL; import java.security.PrivilegedExceptionAction; import java.util.ArrayList; import java.util.Arrays; +import java.util.Collection; import java.util.Collections; import java.util.HashMap; import java.util.List; @@ -62,12 +63,17 @@ import org.apache.hadoop.hdfs.server.nam import org.apache.hadoop.hdfs.server.namenode.TransferFsImage; import org.apache.hadoop.ipc.RPC; import org.apache.hadoop.ipc.RemoteException; +import org.apache.hadoop.ipc.ProtobufRpcEngine; +import org.apache.hadoop.ipc.RefreshCallQueueProtocol; +import org.apache.hadoop.ipc.GenericRefreshProtocol; +import org.apache.hadoop.ipc.RefreshResponse; import org.apache.hadoop.net.NetUtils; import org.apache.hadoop.security.RefreshUserMappingsProtocol; import org.apache.hadoop.security.SecurityUtil; import org.apache.hadoop.security.UserGroupInformation; import org.apache.hadoop.security.authorize.RefreshAuthorizationPolicyProtocol; -import org.apache.hadoop.ipc.RefreshCallQueueProtocol; +import org.apache.hadoop.ipc.protocolPB.GenericRefreshProtocolClientSideTranslatorPB; +import org.apache.hadoop.ipc.protocolPB.GenericRefreshProtocolPB; import org.apache.hadoop.util.StringUtils; import org.apache.hadoop.util.ToolRunner; @@ -688,6 +694,7 @@ public class DFSAdmin extends FsShell { "\t[-refreshUserToGroupsMappings]\n" + "\t[-refreshSuperUserGroupsConfiguration]\n" + "\t[-refreshCallQueue]\n" + + "\t[-refresh <host:ipc_port> <key> [arg1..argn]\n" + "\t[-printTopology]\n" + "\t[-refreshNamenodes datanodehost:port]\n"+ "\t[-deleteBlockPool datanodehost:port blockpoolId [force]]\n"+ @@ -764,6 +771,10 @@ public class DFSAdmin extends FsShell { String refreshCallQueue = "-refreshCallQueue: Reload the call queue from config\n"; + String genericRefresh = "-refresh: Arguments are <hostname:port> <resource_identifier> [arg1..argn]\n" + + "\tTriggers a runtime-refresh of the resource specified by <resource_identifier>\n" + + "\ton <hostname:port>. All other args after are sent to the host."; + String printTopology = "-printTopology: Print a tree of the racks and their\n" + "\t\tnodes as reported by the Namenode\n"; @@ -848,6 +859,8 @@ public class DFSAdmin extends FsShell { System.out.println(refreshSuperUserGroupsConfiguration); } else if ("refreshCallQueue".equals(cmd)) { System.out.println(refreshCallQueue); + } else if ("refresh".equals(cmd)) { + System.out.println(genericRefresh); } else if ("printTopology".equals(cmd)) { System.out.println(printTopology); } else if ("refreshNamenodes".equals(cmd)) { @@ -887,6 +900,7 @@ public class DFSAdmin extends FsShell { System.out.println(refreshUserToGroupsMappings); System.out.println(refreshSuperUserGroupsConfiguration); System.out.println(refreshCallQueue); + System.out.println(genericRefresh); System.out.println(printTopology); System.out.println(refreshNamenodes); System.out.println(deleteBlockPool); @@ -1100,6 +1114,56 @@ public class DFSAdmin extends FsShell { return 0; } + public int genericRefresh(String[] argv, int i) throws IOException { + String hostport = argv[i++]; + String identifier = argv[i++]; + String[] args = Arrays.copyOfRange(argv, i, argv.length); + + // Get the current configuration + Configuration conf = getConf(); + + // for security authorization + // server principal for this call + // should be NN's one. + conf.set(CommonConfigurationKeys.HADOOP_SECURITY_SERVICE_USER_NAME_KEY, + conf.get(DFSConfigKeys.DFS_NAMENODE_KERBEROS_PRINCIPAL_KEY, "")); + + // Create the client + Class<?> xface = GenericRefreshProtocolPB.class; + InetSocketAddress address = NetUtils.createSocketAddr(hostport); + UserGroupInformation ugi = UserGroupInformation.getCurrentUser(); + + RPC.setProtocolEngine(conf, xface, ProtobufRpcEngine.class); + GenericRefreshProtocolPB proxy = (GenericRefreshProtocolPB) + RPC.getProxy(xface, RPC.getProtocolVersion(xface), address, + ugi, conf, NetUtils.getDefaultSocketFactory(conf), 0); + + GenericRefreshProtocol xlator = + new GenericRefreshProtocolClientSideTranslatorPB(proxy); + + // Refresh + Collection<RefreshResponse> responses = xlator.refresh(identifier, args); + + int returnCode = 0; + + // Print refresh responses + System.out.println("Refresh Responses:\n"); + for (RefreshResponse response : responses) { + System.out.println(response.toString()); + + if (returnCode == 0 && response.getReturnCode() != 0) { + // This is the first non-zero return code, so we should return this + returnCode = response.getReturnCode(); + } else if (returnCode != 0 && response.getReturnCode() != 0) { + // Then now we have multiple non-zero return codes, + // so we merge them into -1 + returnCode = -1; + } + } + + return returnCode; + } + /** * Displays format of commands. * @param cmd The command that is being executed. @@ -1162,6 +1226,9 @@ public class DFSAdmin extends FsShell { } else if ("-refreshCallQueue".equals(cmd)) { System.err.println("Usage: java DFSAdmin" + " [-refreshCallQueue]"); + } else if ("-refresh".equals(cmd)) { + System.err.println("Usage: java DFSAdmin" + + " [-refresh <hostname:port> <resource_identifier> [arg1..argn]"); } else if ("-printTopology".equals(cmd)) { System.err.println("Usage: java DFSAdmin" + " [-printTopology]"); @@ -1195,6 +1262,7 @@ public class DFSAdmin extends FsShell { System.err.println(" [-refreshUserToGroupsMappings]"); System.err.println(" [-refreshSuperUserGroupsConfiguration]"); System.err.println(" [-refreshCallQueue]"); + System.err.println(" [-refresh]"); System.err.println(" [-printTopology]"); System.err.println(" [-refreshNamenodes datanodehost:port]"); System.err.println(" [-deleteBlockPool datanode-host:port blockpoolId [force]]"); @@ -1292,6 +1360,11 @@ public class DFSAdmin extends FsShell { printUsage(cmd); return exitCode; } + } else if ("-refresh".equals(cmd)) { + if (argv.length < 3) { + printUsage(cmd); + return exitCode; + } } else if ("-refreshUserToGroupsMappings".equals(cmd)) { if (argv.length != 1) { printUsage(cmd); @@ -1387,6 +1460,8 @@ public class DFSAdmin extends FsShell { exitCode = refreshSuperUserGroupsConfiguration(); } else if ("-refreshCallQueue".equals(cmd)) { exitCode = refreshCallQueue(); + } else if ("-refresh".equals(cmd)) { + exitCode = genericRefresh(argv, i); } else if ("-printTopology".equals(cmd)) { exitCode = printTopology(); } else if ("-refreshNamenodes".equals(cmd)) { Added: hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/TestGenericRefresh.java URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/TestGenericRefresh.java?rev=1602055&view=auto ============================================================================== --- hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/TestGenericRefresh.java (added) +++ hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/TestGenericRefresh.java Thu Jun 12 01:27:39 2014 @@ -0,0 +1,227 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertFalse; +import static org.junit.Assert.assertTrue; +import static org.junit.Assert.fail; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.hdfs.MiniDFSCluster; +import org.apache.hadoop.hdfs.tools.DFSAdmin; +import org.apache.hadoop.ipc.RefreshHandler; + +import org.apache.hadoop.ipc.RefreshRegistry; +import org.apache.hadoop.ipc.RefreshResponse; +import org.junit.Test; +import org.junit.Before; +import org.junit.After; +import org.junit.BeforeClass; +import org.junit.AfterClass; +import org.mockito.Mockito; + +/** + * Before all tests, a MiniDFSCluster is spun up. + * Before each test, mock refresh handlers are created and registered. + * After each test, the mock handlers are unregistered. + * After all tests, the cluster is spun down. + */ +public class TestGenericRefresh { + private static MiniDFSCluster cluster; + private static Configuration config; + private static final int NNPort = 54222; + + private static RefreshHandler firstHandler; + private static RefreshHandler secondHandler; + + @BeforeClass + public static void setUpBeforeClass() throws Exception { + config = new Configuration(); + config.set("hadoop.security.authorization", "true"); + + FileSystem.setDefaultUri(config, "hdfs://localhost:" + NNPort); + cluster = new MiniDFSCluster.Builder(config).nameNodePort(NNPort).build(); + cluster.waitActive(); + } + + @AfterClass + public static void tearDownBeforeClass() throws Exception { + if (cluster != null) { + cluster.shutdown(); + } + } + + @Before + public void setUp() throws Exception { + // Register Handlers, first one just sends an ok response + firstHandler = Mockito.mock(RefreshHandler.class); + Mockito.stub(firstHandler.handleRefresh(Mockito.anyString(), Mockito.any(String[].class))) + .toReturn(RefreshResponse.successResponse()); + RefreshRegistry.defaultRegistry().register("firstHandler", firstHandler); + + // Second handler has conditional response for testing args + secondHandler = Mockito.mock(RefreshHandler.class); + Mockito.stub(secondHandler.handleRefresh("secondHandler", new String[]{"one", "two"})) + .toReturn(new RefreshResponse(3, "three")); + Mockito.stub(secondHandler.handleRefresh("secondHandler", new String[]{"one"})) + .toReturn(new RefreshResponse(2, "two")); + RefreshRegistry.defaultRegistry().register("secondHandler", secondHandler); + } + + @After + public void tearDown() throws Exception { + RefreshRegistry.defaultRegistry().unregisterAll("firstHandler"); + RefreshRegistry.defaultRegistry().unregisterAll("secondHandler"); + } + + @Test + public void testInvalidCommand() throws Exception { + DFSAdmin admin = new DFSAdmin(config); + String [] args = new String[]{"-refresh", "nn"}; + int exitCode = admin.run(args); + assertEquals("DFSAdmin should fail due to bad args", -1, exitCode); + } + + @Test + public void testInvalidIdentifier() throws Exception { + DFSAdmin admin = new DFSAdmin(config); + String [] args = new String[]{"-refresh", "localhost:" + NNPort, "unregisteredIdentity"}; + int exitCode = admin.run(args); + assertEquals("DFSAdmin should fail due to no handler registered", -1, exitCode); + } + + @Test + public void testValidIdentifier() throws Exception { + DFSAdmin admin = new DFSAdmin(config); + String[] args = new String[]{"-refresh", "localhost:" + NNPort, "firstHandler"}; + int exitCode = admin.run(args); + assertEquals("DFSAdmin should succeed", 0, exitCode); + + Mockito.verify(firstHandler).handleRefresh("firstHandler", new String[]{}); + // Second handler was never called + Mockito.verify(secondHandler, Mockito.never()) + .handleRefresh(Mockito.anyString(), Mockito.any(String[].class)); + } + + @Test + public void testVariableArgs() throws Exception { + DFSAdmin admin = new DFSAdmin(config); + String[] args = new String[]{"-refresh", "localhost:" + NNPort, "secondHandler", "one"}; + int exitCode = admin.run(args); + assertEquals("DFSAdmin should return 2", 2, exitCode); + + exitCode = admin.run(new String[]{"-refresh", "localhost:" + NNPort, "secondHandler", "one", "two"}); + assertEquals("DFSAdmin should now return 3", 3, exitCode); + + Mockito.verify(secondHandler).handleRefresh("secondHandler", new String[]{"one"}); + Mockito.verify(secondHandler).handleRefresh("secondHandler", new String[]{"one", "two"}); + } + + @Test + public void testUnregistration() throws Exception { + RefreshRegistry.defaultRegistry().unregisterAll("firstHandler"); + + // And now this should fail + DFSAdmin admin = new DFSAdmin(config); + String[] args = new String[]{"-refresh", "localhost:" + NNPort, "firstHandler"}; + int exitCode = admin.run(args); + assertEquals("DFSAdmin should return -1", -1, exitCode); + } + + @Test + public void testUnregistrationReturnValue() { + RefreshHandler mockHandler = Mockito.mock(RefreshHandler.class); + RefreshRegistry.defaultRegistry().register("test", mockHandler); + boolean ret = RefreshRegistry.defaultRegistry().unregister("test", mockHandler); + assertTrue(ret); + } + + @Test + public void testMultipleRegistration() throws Exception { + RefreshRegistry.defaultRegistry().register("sharedId", firstHandler); + RefreshRegistry.defaultRegistry().register("sharedId", secondHandler); + + // this should trigger both + DFSAdmin admin = new DFSAdmin(config); + String[] args = new String[]{"-refresh", "localhost:" + NNPort, "sharedId", "one"}; + int exitCode = admin.run(args); + assertEquals(-1, exitCode); // -1 because one of the responses is unregistered + + // verify we called both + Mockito.verify(firstHandler).handleRefresh("sharedId", new String[]{"one"}); + Mockito.verify(secondHandler).handleRefresh("sharedId", new String[]{"one"}); + + RefreshRegistry.defaultRegistry().unregisterAll("sharedId"); + } + + @Test + public void testMultipleReturnCodeMerging() throws Exception { + // Two handlers which return two non-zero values + RefreshHandler handlerOne = Mockito.mock(RefreshHandler.class); + Mockito.stub(handlerOne.handleRefresh(Mockito.anyString(), Mockito.any(String[].class))) + .toReturn(new RefreshResponse(23, "Twenty Three")); + + RefreshHandler handlerTwo = Mockito.mock(RefreshHandler.class); + Mockito.stub(handlerTwo.handleRefresh(Mockito.anyString(), Mockito.any(String[].class))) + .toReturn(new RefreshResponse(10, "Ten")); + + // Then registered to the same ID + RefreshRegistry.defaultRegistry().register("shared", handlerOne); + RefreshRegistry.defaultRegistry().register("shared", handlerTwo); + + // We refresh both + DFSAdmin admin = new DFSAdmin(config); + String[] args = new String[]{"-refresh", "localhost:" + NNPort, "shared"}; + int exitCode = admin.run(args); + assertEquals(-1, exitCode); // We get -1 because of our logic for melding non-zero return codes + + // Verify we called both + Mockito.verify(handlerOne).handleRefresh("shared", new String[]{}); + Mockito.verify(handlerTwo).handleRefresh("shared", new String[]{}); + + RefreshRegistry.defaultRegistry().unregisterAll("shared"); + } + + @Test + public void testExceptionResultsInNormalError() throws Exception { + // In this test, we ensure that all handlers are called even if we throw an exception in one + RefreshHandler exceptionalHandler = Mockito.mock(RefreshHandler.class); + Mockito.stub(exceptionalHandler.handleRefresh(Mockito.anyString(), Mockito.any(String[].class))) + .toThrow(new RuntimeException("Exceptional Handler Throws Exception")); + + RefreshHandler otherExceptionalHandler = Mockito.mock(RefreshHandler.class); + Mockito.stub(otherExceptionalHandler.handleRefresh(Mockito.anyString(), Mockito.any(String[].class))) + .toThrow(new RuntimeException("More Exceptions")); + + RefreshRegistry.defaultRegistry().register("exceptional", exceptionalHandler); + RefreshRegistry.defaultRegistry().register("exceptional", otherExceptionalHandler); + + DFSAdmin admin = new DFSAdmin(config); + String[] args = new String[]{"-refresh", "localhost:" + NNPort, "exceptional"}; + int exitCode = admin.run(args); + assertEquals(-1, exitCode); // Exceptions result in a -1 + + Mockito.verify(exceptionalHandler).handleRefresh("exceptional", new String[]{}); + Mockito.verify(otherExceptionalHandler).handleRefresh("exceptional", new String[]{}); + + RefreshRegistry.defaultRegistry().unregisterAll("exceptional"); + } +} \ No newline at end of file