Repository: ambari Updated Branches: refs/heads/trunk cc076cf34 -> 3d397dc04
http://git-wip-us.apache.org/repos/asf/ambari/blob/3d397dc0/ambari-server/src/test/java/org/apache/ambari/server/actionmanager/TestActionScheduler.java ---------------------------------------------------------------------- diff --git a/ambari-server/src/test/java/org/apache/ambari/server/actionmanager/TestActionScheduler.java b/ambari-server/src/test/java/org/apache/ambari/server/actionmanager/TestActionScheduler.java index 7224924..c5c5bde 100644 --- a/ambari-server/src/test/java/org/apache/ambari/server/actionmanager/TestActionScheduler.java +++ b/ambari-server/src/test/java/org/apache/ambari/server/actionmanager/TestActionScheduler.java @@ -25,14 +25,7 @@ import static org.mockito.Matchers.anyLong; import static org.mockito.Matchers.anyString; import static org.mockito.Mockito.*; import java.lang.reflect.Type; -import java.util.ArrayList; -import java.util.Collection; -import java.util.Collections; -import java.util.HashMap; -import java.util.List; -import java.util.Map; -import java.util.Properties; -import java.util.Set; +import java.util.*; import java.util.concurrent.CountDownLatch; import java.util.concurrent.TimeUnit; @@ -52,9 +45,7 @@ import org.apache.ambari.server.agent.CommandReport; import org.apache.ambari.server.agent.ExecutionCommand; import org.apache.ambari.server.configuration.Configuration; import org.apache.ambari.server.controller.HostsMap; -import org.apache.ambari.server.serveraction.ServerAction; -import org.apache.ambari.server.serveraction.ServerActionManager; -import org.apache.ambari.server.serveraction.ServerActionManagerImpl; +import org.apache.ambari.server.serveraction.MockServerAction; import org.apache.ambari.server.state.Cluster; import org.apache.ambari.server.state.Clusters; import org.apache.ambari.server.state.Host; @@ -65,6 +56,7 @@ import org.apache.ambari.server.state.ServiceComponentHost; import org.apache.ambari.server.state.ServiceComponentHostEvent; import org.apache.ambari.server.state.svccomphost.ServiceComponentHostInstallEvent; import org.apache.ambari.server.state.svccomphost.ServiceComponentHostOpFailedEvent; +import org.apache.ambari.server.state.svccomphost.ServiceComponentHostServerActionEvent; import org.apache.ambari.server.state.svccomphost.ServiceComponentHostUpgradeEvent; import org.apache.ambari.server.utils.StageUtils; import org.easymock.Capture; @@ -86,6 +78,8 @@ public class TestActionScheduler { private static final String CLUSTER_HOST_INFO_UPDATED = "{all_hosts=[c6401.ambari.apache.org," + " c6402.ambari.apache.org], slave_hosts=[c6401.ambari.apache.org," + " c6402.ambari.apache.org]}"; + + private final String serverHostname = StageUtils.getHostName(); private final String hostname = "ahost.ambari.apache.org"; private final int MAX_CYCLE_ITERATIONS = 100; @@ -96,7 +90,7 @@ public class TestActionScheduler { */ @Test public void testActionSchedule() throws Exception { - + Type type = new TypeToken<Map<String, Set<String>>>() {}.getType(); Map<String, List<String>> clusterHostInfo = StageUtils.getGson().fromJson(CLUSTER_HOST_INFO, type); @@ -116,7 +110,7 @@ public class TestActionScheduler { when(serviceObj.getServiceComponent(anyString())).thenReturn(scomp); when(scomp.getServiceComponentHost(anyString())).thenReturn(sch); when(serviceObj.getCluster()).thenReturn(oneClusterMock); - + Host host = mock(Host.class); HashMap<String, ServiceComponentHost> hosts = new HashMap<String, ServiceComponentHost>(); @@ -141,7 +135,7 @@ public class TestActionScheduler { //Keep large number of attempts so that the task is not expired finally //Small action timeout to test rescheduling ActionScheduler scheduler = new ActionScheduler(100, 5, db, aq, fsm, - 10000, new HostsMap((String) null), null, unitOfWork, conf); + 10000, new HostsMap((String) null), unitOfWork, conf); scheduler.setTaskTimeoutAdjustment(false); List<AgentCommand> ac = waitForQueueSize(hostname, aq, 1, scheduler); @@ -167,7 +161,7 @@ public class TestActionScheduler { int expectedQueueSize, ActionScheduler scheduler) { int cycleCount = 0; while (cycleCount++ <= MAX_CYCLE_ITERATIONS) { - List<AgentCommand> ac = aq.dequeueAll(hostname); + List<AgentCommand> ac = aq.dequeueAll(hostname); if (ac != null) { if (ac.size() == expectedQueueSize) { return ac; @@ -239,7 +233,7 @@ public class TestActionScheduler { //Small action timeout to test rescheduling ActionScheduler scheduler = new ActionScheduler(100, 0, db, aq, fsm, 3, - new HostsMap((String) null), null, unitOfWork, conf); + new HostsMap((String) null), unitOfWork, conf); scheduler.setTaskTimeoutAdjustment(false); // Start the thread @@ -305,12 +299,10 @@ public class TestActionScheduler { } }).when(db).timeoutHostRole(anyString(), anyLong(), anyLong(), anyString()); - ServerActionManager sam = EasyMock.createNiceMock(ServerActionManager.class); - //Small action timeout to test rescheduling ActionScheduler scheduler = EasyMock.createMockBuilder(ActionScheduler.class). withConstructor((long) 100, (long) 50, db, aq, fsm, 3, - new HostsMap((String) null), sam, unitOfWork, conf). + new HostsMap((String) null), unitOfWork, conf). addMockedMethod("cancelHostRoleCommands"). createMock(); scheduler.cancelHostRoleCommands((Collection<HostRoleCommand>)EasyMock.anyObject(),EasyMock.anyObject(String.class)); @@ -425,7 +417,7 @@ public class TestActionScheduler { // Make sure the NN install doesn't timeout ActionScheduler scheduler = new ActionScheduler(100, 50000, db, aq, fsm, 3, - new HostsMap((String) null), null, unitOfWork, conf); + new HostsMap((String) null), unitOfWork, conf); scheduler.setTaskTimeoutAdjustment(false); int cycleCount=0; @@ -485,7 +477,13 @@ public class TestActionScheduler { ServiceComponent scomp = mock(ServiceComponent.class); ServiceComponentHost sch = mock(ServiceComponentHost.class); UnitOfWork unitOfWork = mock(UnitOfWork.class); + Host host = mock(Host.class); + + when(host.getHostName()).thenReturn(serverHostname); + when(host.getState()).thenReturn(HostState.HEALTHY); + when(fsm.getCluster(anyString())).thenReturn(oneClusterMock); + when(fsm.getHost(anyString())).thenReturn(host); when(oneClusterMock.getService(anyString())).thenReturn(serviceObj); when(serviceObj.getServiceComponent(anyString())).thenReturn(scomp); when(scomp.getServiceComponentHost(anyString())).thenReturn(sch); @@ -493,14 +491,12 @@ public class TestActionScheduler { HashMap<String, ServiceComponentHost> hosts = new HashMap<String, ServiceComponentHost>(); - hosts.put(hostname, sch); + hosts.put(serverHostname, sch); when(scomp.getServiceComponentHosts()).thenReturn(hosts); List<Stage> stages = new ArrayList<Stage>(); Map<String, String> payload = new HashMap<String, String>(); - payload.put(ServerAction.PayloadName.CLUSTER_NAME, "cluster1"); - payload.put(ServerAction.PayloadName.CURRENT_STACK_VERSION, "HDP-0.2"); - final Stage s = getStageWithServerAction(1, 977, hostname, payload, "test"); + final Stage s = getStageWithServerAction(1, 977, payload, "test", 300); stages.add(s); ActionDBAccessor db = mock(ActionDBAccessor.class); @@ -522,21 +518,134 @@ public class TestActionScheduler { } }).when(db).updateHostRoleState(anyString(), anyLong(), anyLong(), anyString(), any(CommandReport.class)); + doAnswer(new Answer() { + @Override + public Object answer(InvocationOnMock invocation) throws Throwable { + return s.getHostRoleCommand(serverHostname, "AMBARI_SERVER_ACTION"); + } + }).when(db).getTask(anyLong()); + doAnswer(new Answer() { + @Override + public Object answer(InvocationOnMock invocation) throws Throwable { + String host = (String) invocation.getArguments()[0]; + String role = (String) invocation.getArguments()[1]; + HostRoleStatus status = (HostRoleStatus) invocation.getArguments()[2]; + + HostRoleCommand task = s.getHostRoleCommand(host, role); + + if (task.getStatus() == status) { + return Arrays.asList(task); + } else { + return null; + } + } + }).when(db).getTasksByHostRoleAndStatus(anyString(), anyString(), any(HostRoleStatus.class)); ActionScheduler scheduler = new ActionScheduler(100, 50, db, aq, fsm, 3, - new HostsMap((String) null), new ServerActionManagerImpl(fsm), - unitOfWork, conf); + new HostsMap((String) null), unitOfWork, conf); - int cycleCount=0; - while (!stages.get(0).getHostRoleStatus(hostname, "AMBARI_SERVER_ACTION") + int cycleCount = 0; + while (!stages.get(0).getHostRoleStatus(serverHostname, "AMBARI_SERVER_ACTION") .equals(HostRoleStatus.COMPLETED) && cycleCount++ <= MAX_CYCLE_ITERATIONS) { scheduler.doWork(); + scheduler.getServerActionExecutor().doWork(); } - assertEquals(stages.get(0).getHostRoleStatus(hostname, "AMBARI_SERVER_ACTION"), + assertEquals(stages.get(0).getHostRoleStatus(serverHostname, "AMBARI_SERVER_ACTION"), HostRoleStatus.COMPLETED); + } + + /** + * Test server action + */ + @Test + public void testServerActionTimeOut() throws Exception { + ActionQueue aq = new ActionQueue(); + Properties properties = new Properties(); + Configuration conf = new Configuration(properties); + Clusters fsm = mock(Clusters.class); + Cluster oneClusterMock = mock(Cluster.class); + Service serviceObj = mock(Service.class); + ServiceComponent scomp = mock(ServiceComponent.class); + ServiceComponentHost sch = mock(ServiceComponentHost.class); + UnitOfWork unitOfWork = mock(UnitOfWork.class); + when(fsm.getCluster(anyString())).thenReturn(oneClusterMock); + when(oneClusterMock.getService(anyString())).thenReturn(serviceObj); + when(serviceObj.getServiceComponent(anyString())).thenReturn(scomp); + when(scomp.getServiceComponentHost(anyString())).thenReturn(sch); + when(serviceObj.getCluster()).thenReturn(oneClusterMock); + + Host host = mock(Host.class); + HashMap<String, ServiceComponentHost> hosts = + new HashMap<String, ServiceComponentHost>(); + hosts.put(serverHostname, sch); + when(scomp.getServiceComponentHosts()).thenReturn(hosts); + + when(fsm.getHost(anyString())).thenReturn(host); + when(host.getState()).thenReturn(HostState.HEALTHY); + when(host.getHostName()).thenReturn(serverHostname); + + List<Stage> stages = new ArrayList<Stage>(); + Map<String, String> payload = new HashMap<String, String>(); + payload.put(MockServerAction.PAYLOAD_FORCE_FAIL, "timeout"); + final Stage s = getStageWithServerAction(1, 977, payload, "test", 2); + stages.add(s); + + ActionDBAccessor db = mock(ActionDBAccessor.class); + + Request request = mock(Request.class); + when(request.isExclusive()).thenReturn(false); + when(db.getRequest(anyLong())).thenReturn(request); + + when(db.getStagesInProgress()).thenReturn(stages); + doAnswer(new Answer() { + @Override + public Object answer(InvocationOnMock invocation) throws Throwable { + String host = (String) invocation.getArguments()[0]; + String role = (String) invocation.getArguments()[3]; + CommandReport commandReport = (CommandReport) invocation.getArguments()[4]; + HostRoleCommand command = s.getHostRoleCommand(host, role); + command.setStatus(HostRoleStatus.valueOf(commandReport.getStatus())); + return null; + } + }).when(db).updateHostRoleState(anyString(), anyLong(), anyLong(), anyString(), any(CommandReport.class)); + + doAnswer(new Answer() { + @Override + public Object answer(InvocationOnMock invocation) throws Throwable { + return s.getHostRoleCommand(serverHostname, "AMBARI_SERVER_ACTION"); + } + }).when(db).getTask(anyLong()); + doAnswer(new Answer() { + @Override + public Object answer(InvocationOnMock invocation) throws Throwable { + String host = (String) invocation.getArguments()[0]; + String role = (String) invocation.getArguments()[1]; + HostRoleStatus status = (HostRoleStatus) invocation.getArguments()[2]; + + HostRoleCommand task = s.getHostRoleCommand(host, role); + + if (task.getStatus() == status) { + return Arrays.asList(task); + } else { + return null; + } + } + }).when(db).getTasksByHostRoleAndStatus(anyString(), anyString(), any(HostRoleStatus.class)); + ActionScheduler scheduler = new ActionScheduler(100, 50, db, aq, fsm, 3, + new HostsMap((String) null), unitOfWork, conf); + + int cycleCount = 0; + while (!stages.get(0).getHostRoleStatus(serverHostname, "AMBARI_SERVER_ACTION").isCompletedState() + && cycleCount++ <= MAX_CYCLE_ITERATIONS) { + scheduler.doWork(); + scheduler.getServerActionExecutor().doWork(); + } + + assertEquals(HostRoleStatus.TIMEDOUT, + stages.get(0).getHostRoleStatus(serverHostname, "AMBARI_SERVER_ACTION")); } @Test @@ -558,13 +667,13 @@ public class TestActionScheduler { HashMap<String, ServiceComponentHost> hosts = new HashMap<String, ServiceComponentHost>(); - hosts.put(hostname, sch); + hosts.put(serverHostname, sch); when(scomp.getServiceComponentHosts()).thenReturn(hosts); List<Stage> stages = new ArrayList<Stage>(); Map<String, String> payload = new HashMap<String, String>(); - payload.put(ServerAction.PayloadName.CURRENT_STACK_VERSION, "HDP-0.2"); - final Stage s = getStageWithServerAction(1, 977, hostname, payload, "test"); + payload.put(MockServerAction.PAYLOAD_FORCE_FAIL, "exception"); + final Stage s = getStageWithServerAction(1, 977, payload, "test", 300); stages.add(s); ActionDBAccessor db = mock(ActionDBAccessor.class); @@ -586,34 +695,57 @@ public class TestActionScheduler { } }).when(db).updateHostRoleState(anyString(), anyLong(), anyLong(), anyString(), any(CommandReport.class)); + doAnswer(new Answer() { + @Override + public Object answer(InvocationOnMock invocation) throws Throwable { + return s.getHostRoleCommand(serverHostname, "AMBARI_SERVER_ACTION"); + } + }).when(db).getTask(anyLong()); + doAnswer(new Answer() { + @Override + public Object answer(InvocationOnMock invocation) throws Throwable { + String host = (String) invocation.getArguments()[0]; + String role = (String) invocation.getArguments()[1]; + HostRoleStatus status = (HostRoleStatus) invocation.getArguments()[2]; + + HostRoleCommand task = s.getHostRoleCommand(host, role); + + if (task.getStatus() == status) { + return Arrays.asList(task); + } else { + return null; + } + } + }).when(db).getTasksByHostRoleAndStatus(anyString(), anyString(), any(HostRoleStatus.class)); ActionScheduler scheduler = new ActionScheduler(100, 50, db, aq, fsm, 3, - new HostsMap((String) null), new ServerActionManagerImpl(fsm), unitOfWork, conf); + new HostsMap((String) null), unitOfWork, conf); int cycleCount = 0; - while (!stages.get(0).getHostRoleStatus(hostname, "AMBARI_SERVER_ACTION") + while (!stages.get(0).getHostRoleStatus(serverHostname, "AMBARI_SERVER_ACTION") .equals(HostRoleStatus.FAILED) && cycleCount++ <= MAX_CYCLE_ITERATIONS) { scheduler.doWork(); + scheduler.getServerActionExecutor().doWork(); } - assertEquals(stages.get(0).getHostRoleStatus(hostname, "AMBARI_SERVER_ACTION"), + assertEquals(stages.get(0).getHostRoleStatus(serverHostname, "AMBARI_SERVER_ACTION"), HostRoleStatus.FAILED); assertEquals("test", stages.get(0).getRequestContext()); } - private static Stage getStageWithServerAction(long requestId, long stageId, String hostName, - Map<String, String> payload, String requestContext) { + private static Stage getStageWithServerAction(long requestId, long stageId, + Map<String, String> payload, String requestContext, + int timeout) { + String serverHostname = StageUtils.getHostName(); Stage stage = new Stage(requestId, "/tmp", "cluster1", 1L, requestContext, CLUSTER_HOST_INFO, - "", ""); + "{}", "{}"); stage.setStageId(stageId); - long now = System.currentTimeMillis(); - stage.addServerActionCommand(ServerAction.Command.FINALIZE_UPGRADE, Role.AMBARI_SERVER_ACTION, + + stage.addServerActionCommand(MockServerAction.class.getName(), Role.AMBARI_SERVER_ACTION, RoleCommand.EXECUTE, "cluster1", - new ServiceComponentHostUpgradeEvent("AMBARI_SERVER_ACTION", hostName, now, "HDP-0.2"), - hostName); - ExecutionCommand execCmd = stage.getExecutionCommandWrapper(hostName, - Role.AMBARI_SERVER_ACTION.toString()).getExecutionCommand(); + new ServiceComponentHostServerActionEvent(serverHostname, System.currentTimeMillis()), + payload, + timeout); - execCmd.setCommandParams(payload); return stage; } @@ -666,14 +798,14 @@ public class TestActionScheduler { RoleCommand.START, Service.Type.HDFS, 3, 3, 3)); stages.add( - getStageWithSingleTask( - hostname3, "cluster1", Role.DATANODE, - RoleCommand.START, Service.Type.HDFS, 4, 4, 4)); + getStageWithSingleTask( + hostname3, "cluster1", Role.DATANODE, + RoleCommand.START, Service.Type.HDFS, 4, 4, 4)); stages.add( // Stage with the same request id, should not be scheduled - getStageWithSingleTask( - hostname4, "cluster1", Role.GANGLIA_MONITOR, - RoleCommand.START, Service.Type.GANGLIA, 5, 5, 4)); + getStageWithSingleTask( + hostname4, "cluster1", Role.GANGLIA_MONITOR, + RoleCommand.START, Service.Type.GANGLIA, 5, 5, 4)); ActionDBAccessor db = mock(ActionDBAccessor.class); @@ -686,12 +818,11 @@ public class TestActionScheduler { Properties properties = new Properties(); Configuration conf = new Configuration(properties); ActionScheduler scheduler = new ActionScheduler(100, 50, db, aq, fsm, 3, - new HostsMap((String) null), new ServerActionManagerImpl(fsm), - unitOfWork, conf); + new HostsMap((String) null), unitOfWork, conf); ActionManager am = new ActionManager( - 2, 2, aq, fsm, db, new HostsMap((String) null), - new ServerActionManagerImpl(fsm), unitOfWork, requestFactory, conf); + 2, 2, aq, fsm, db, new HostsMap((String) null), + unitOfWork, requestFactory, conf); scheduler.doWork(); @@ -740,24 +871,24 @@ public class TestActionScheduler { hostname1, "cluster1", Role.DATANODE, RoleCommand.START, Service.Type.HDFS, 1, 1, 1)); stages.add( // Stage with the same hostname, should not be scheduled - getStageWithSingleTask( - hostname1, "cluster1", Role.GANGLIA_MONITOR, - RoleCommand.START, Service.Type.GANGLIA, 2, 2, 2)); + getStageWithSingleTask( + hostname1, "cluster1", Role.GANGLIA_MONITOR, + RoleCommand.START, Service.Type.GANGLIA, 2, 2, 2)); stages.add( - getStageWithSingleTask( - hostname2, "cluster1", Role.DATANODE, - RoleCommand.START, Service.Type.HDFS, 3, 3, 3)); + getStageWithSingleTask( + hostname2, "cluster1", Role.DATANODE, + RoleCommand.START, Service.Type.HDFS, 3, 3, 3)); stages.add( - getStageWithSingleTask( - hostname3, "cluster1", Role.DATANODE, - RoleCommand.START, Service.Type.HDFS, 4, 4, 4)); + getStageWithSingleTask( + hostname3, "cluster1", Role.DATANODE, + RoleCommand.START, Service.Type.HDFS, 4, 4, 4)); stages.add( // Stage with the same request id, should not be scheduled - getStageWithSingleTask( - hostname4, "cluster1", Role.GANGLIA_MONITOR, - RoleCommand.START, Service.Type.GANGLIA, 5, 5, 4)); + getStageWithSingleTask( + hostname4, "cluster1", Role.GANGLIA_MONITOR, + RoleCommand.START, Service.Type.GANGLIA, 5, 5, 4)); ActionDBAccessor db = mock(ActionDBAccessor.class); @@ -771,13 +902,13 @@ public class TestActionScheduler { properties.put(Configuration.PARALLEL_STAGE_EXECUTION_KEY, "false"); Configuration conf = new Configuration(properties); ActionScheduler scheduler = new ActionScheduler(100, 50, db, aq, fsm, 3, - new HostsMap((String) null), new ServerActionManagerImpl(fsm), + new HostsMap((String) null), unitOfWork, conf); ActionManager am = new ActionManager( - 2, 2, aq, fsm, db, new HostsMap((String) null), - new ServerActionManagerImpl(fsm), unitOfWork, - requestFactory, conf); + 2, 2, aq, fsm, db, new HostsMap((String) null), + unitOfWork, + requestFactory, conf); scheduler.doWork(); @@ -805,7 +936,7 @@ public class TestActionScheduler { when(serviceObj.getServiceComponent(anyString())).thenReturn(scomp); when(scomp.getServiceComponentHost(anyString())).thenReturn(sch); when(serviceObj.getCluster()).thenReturn(oneClusterMock); - + String hostname1 = "ahost.ambari.apache.org"; String hostname2 = "bhost.ambari.apache.org"; HashMap<String, ServiceComponentHost> hosts = @@ -813,26 +944,26 @@ public class TestActionScheduler { hosts.put(hostname1, sch); hosts.put(hostname2, sch); when(scomp.getServiceComponentHosts()).thenReturn(hosts); - + List<Stage> stages = new ArrayList<Stage>(); Stage backgroundStage = null; stages.add(//stage with background command backgroundStage = getStageWithSingleTask( hostname1, "cluster1", Role.NAMENODE, RoleCommand.CUSTOM_COMMAND, "REBALANCEHDFS", Service.Type.HDFS, 1, 1, 1)); - + Assert.assertEquals(AgentCommandType.BACKGROUND_EXECUTION_COMMAND ,backgroundStage.getExecutionCommands(hostname1).get(0).getExecutionCommand().getCommandType()); - + stages.add( // Stage with the same hostname, should be scheduled getStageWithSingleTask( hostname1, "cluster1", Role.GANGLIA_MONITOR, RoleCommand.START, Service.Type.GANGLIA, 2, 2, 2)); - + stages.add( getStageWithSingleTask( hostname2, "cluster1", Role.DATANODE, RoleCommand.START, Service.Type.HDFS, 3, 3, 3)); - - + + ActionDBAccessor db = mock(ActionDBAccessor.class); Request request = mock(Request.class); @@ -840,21 +971,21 @@ public class TestActionScheduler { when(db.getRequest(anyLong())).thenReturn(request); when(db.getStagesInProgress()).thenReturn(stages); - + Properties properties = new Properties(); properties.put(Configuration.PARALLEL_STAGE_EXECUTION_KEY, "true"); Configuration conf = new Configuration(properties); ActionScheduler scheduler = new ActionScheduler(100, 50, db, aq, fsm, 3, - new HostsMap((String) null), new ServerActionManagerImpl(fsm), + new HostsMap((String) null), unitOfWork, conf); - + ActionManager am = new ActionManager( 2, 2, aq, fsm, db, new HostsMap((String) null), - new ServerActionManagerImpl(fsm), unitOfWork, + unitOfWork, requestFactory, conf); - + scheduler.doWork(); - + Assert.assertEquals(HostRoleStatus.QUEUED, stages.get(0).getHostRoleStatus(hostname1, "NAMENODE")); Assert.assertEquals(HostRoleStatus.QUEUED, stages.get(2).getHostRoleStatus(hostname2, "DATANODE")); @@ -966,12 +1097,11 @@ public class TestActionScheduler { Properties properties = new Properties(); Configuration conf = new Configuration(properties); - ServerActionManagerImpl serverActionManager = new ServerActionManagerImpl(fsm); Capture<Collection<HostRoleCommand>> cancelCommandList = new Capture<Collection<HostRoleCommand>>(); ActionScheduler scheduler = EasyMock.createMockBuilder(ActionScheduler.class). withConstructor((long)100, (long)50, db, aq, fsm, 3, - new HostsMap((String) null), serverActionManager, + new HostsMap((String) null), unitOfWork, conf). addMockedMethod("cancelHostRoleCommands"). createMock(); @@ -981,7 +1111,7 @@ public class TestActionScheduler { EasyMock.replay(scheduler); ActionManager am = new ActionManager( - 2, 2, aq, fsm, db, new HostsMap((String) null), serverActionManager, unitOfWork, requestFactory, conf); + 2, 2, aq, fsm, db, new HostsMap((String) null), unitOfWork, requestFactory, conf); scheduler.doWork(); @@ -1140,10 +1270,10 @@ public class TestActionScheduler { Properties properties = new Properties(); Configuration conf = new Configuration(properties); ActionScheduler scheduler = new ActionScheduler(100, 10000, db, aq, fsm, 3, - new HostsMap((String) null), new ServerActionManagerImpl(fsm), + new HostsMap((String) null), unitOfWork, conf); ActionManager am = new ActionManager( - 2, 10000, aq, fsm, db, new HostsMap((String) null), new ServerActionManagerImpl(fsm), unitOfWork, requestFactory, conf); + 2, 10000, aq, fsm, db, new HostsMap((String) null), unitOfWork, requestFactory, conf); scheduler.doWork(); @@ -1326,9 +1456,9 @@ public class TestActionScheduler { Configuration conf = new Configuration(properties); ActionScheduler scheduler = new ActionScheduler(100, 50, db, aq, fsm, 3, new HostsMap((String) null), - new ServerActionManagerImpl(fsm), unitOfWork, conf); + unitOfWork, conf); ActionManager am = new ActionManager( - 2, 2, aq, fsm, db, new HostsMap((String) null), new ServerActionManagerImpl(fsm), unitOfWork, requestFactory, conf); + 2, 2, aq, fsm, db, new HostsMap((String) null), unitOfWork, requestFactory, conf); scheduler.doWork(); @@ -1419,7 +1549,7 @@ public class TestActionScheduler { assertEquals(new Float(1.0), new Float(s.getSuccessFactor(Role.NAMENODE))); assertEquals(new Float(1.0), new Float(s.getSuccessFactor(Role.GANGLIA_SERVER))); } - + @Test public void testSuccessCriteria() { RoleStats rs1 = new RoleStats(1, (float)0.5); @@ -1427,37 +1557,37 @@ public class TestActionScheduler { assertTrue(rs1.isSuccessFactorMet()); rs1.numSucceeded = 0; assertFalse(rs1.isSuccessFactorMet()); - + RoleStats rs2 = new RoleStats(2, (float)0.5); rs2.numSucceeded = 1; assertTrue(rs2.isSuccessFactorMet()); - + RoleStats rs3 = new RoleStats(3, (float)0.5); rs3.numSucceeded = 2; assertTrue(rs2.isSuccessFactorMet()); rs3.numSucceeded = 1; assertFalse(rs3.isSuccessFactorMet()); - + RoleStats rs4 = new RoleStats(3, (float)1.0); rs4.numSucceeded = 2; assertFalse(rs3.isSuccessFactorMet()); } - + /** * This test sends verifies that ActionScheduler returns up-to-date cluster host info and caching works correctly. */ @Test public void testClusterHostInfoCache() throws Exception { - + Type type = new TypeToken<Map<String, Set<String>>>() {}.getType(); - + //Data for stages Map<String, Set<String>> clusterHostInfo1 = StageUtils.getGson().fromJson(CLUSTER_HOST_INFO, type); Map<String, Set<String>> clusterHostInfo2 = StageUtils.getGson().fromJson(CLUSTER_HOST_INFO_UPDATED, type); int stageId = 1; int requestId1 = 1; int requestId2 = 2; - + ActionQueue aq = new ActionQueue(); Properties properties = new Properties(); Configuration conf = new Configuration(properties); @@ -1497,19 +1627,19 @@ public class TestActionScheduler { //Keep large number of attempts so that the task is not expired finally //Small action timeout to test rescheduling ActionScheduler scheduler = new ActionScheduler(100, 100, db, aq, fsm, - 10000, new HostsMap((String) null), null, unitOfWork, conf); + 10000, new HostsMap((String) null), unitOfWork, conf); scheduler.setTaskTimeoutAdjustment(false); List<AgentCommand> ac = waitForQueueSize(hostname, aq, 1, scheduler); assertTrue(ac.get(0) instanceof ExecutionCommand); assertEquals(String.valueOf(requestId1) + "-" + stageId, ((ExecutionCommand) (ac.get(0))).getCommandId()); - + assertEquals(clusterHostInfo1, ((ExecutionCommand) (ac.get(0))).getClusterHostInfo()); - + when(db.getStagesInProgress()).thenReturn(Collections.singletonList(s2)); - + //Verify that ActionSheduler does not return cached value of cluster host info for new requestId ac = waitForQueueSize(hostname, aq, 1, scheduler); assertTrue(ac.get(0) instanceof ExecutionCommand); @@ -1576,7 +1706,7 @@ public class TestActionScheduler { when(db.getStagesInProgress()).thenReturn(stages); ActionScheduler scheduler = new ActionScheduler(100, 50000, db, aq, fsm, 3, - new HostsMap((String) null), null, unitOfWork, conf); + new HostsMap((String) null), unitOfWork, conf); final CountDownLatch abortCalls = new CountDownLatch(2); @@ -1639,15 +1769,13 @@ public class TestActionScheduler { HashMap<String, ServiceComponentHost> hosts = new HashMap<String, ServiceComponentHost>(); - hosts.put(hostname, sch); + hosts.put(serverHostname, sch); when(scomp.getServiceComponentHosts()).thenReturn(hosts); List<Stage> stages = new ArrayList<Stage>(); Map<String, String> payload = new HashMap<String, String>(); - payload.put(ServerAction.PayloadName.CLUSTER_NAME, "cluster1"); - payload.put(ServerAction.PayloadName.CURRENT_STACK_VERSION, "HDP-0.2"); - final Stage s = getStageWithServerAction(1, 977, hostname, payload, "test"); - s.getExecutionCommands().get("ahost.ambari.apache.org").get(0).getExecutionCommand().setServiceName(null); + final Stage s = getStageWithServerAction(1, 977, payload, "test", 300); + s.getExecutionCommands().get(serverHostname).get(0).getExecutionCommand().setServiceName(null); stages.add(s); ActionDBAccessor db = mock(ActionDBAccessor.class); @@ -1668,19 +1796,40 @@ public class TestActionScheduler { return null; } }).when(db).updateHostRoleState(anyString(), anyLong(), anyLong(), anyString(), any(CommandReport.class)); + doAnswer(new Answer() { + @Override + public Object answer(InvocationOnMock invocation) throws Throwable { + String host = (String) invocation.getArguments()[0]; + String role = (String) invocation.getArguments()[1]; + HostRoleStatus status = (HostRoleStatus) invocation.getArguments()[2]; + HostRoleCommand task = s.getHostRoleCommand(host, role); + + if (task.getStatus() == status) { + return Arrays.asList(task); + } else { + return null; + } + } + }).when(db).getTasksByHostRoleAndStatus(anyString(), anyString(), any(HostRoleStatus.class)); + doAnswer(new Answer() { + @Override + public Object answer(InvocationOnMock invocation) throws Throwable { + return s.getHostRoleCommand(serverHostname, "AMBARI_SERVER_ACTION"); + } + }).when(db).getTask(anyLong()); ActionScheduler scheduler = new ActionScheduler(100, 50, db, aq, fsm, 3, - new HostsMap((String) null), new ServerActionManagerImpl(fsm), - unitOfWork, conf); + new HostsMap((String) null), unitOfWork, conf); int cycleCount = 0; - while (!stages.get(0).getHostRoleStatus(hostname, "AMBARI_SERVER_ACTION") - .equals(HostRoleStatus.COMPLETED) && cycleCount <= MAX_CYCLE_ITERATIONS) { + while (!stages.get(0).getHostRoleStatus(serverHostname, "AMBARI_SERVER_ACTION") + .equals(HostRoleStatus.COMPLETED) && cycleCount++ <= MAX_CYCLE_ITERATIONS) { scheduler.doWork(); + scheduler.getServerActionExecutor().doWork(); } - assertEquals(stages.get(0).getHostRoleStatus(hostname, "AMBARI_SERVER_ACTION"), + assertEquals(stages.get(0).getHostRoleStatus(serverHostname, "AMBARI_SERVER_ACTION"), HostRoleStatus.COMPLETED); } @@ -1800,14 +1949,13 @@ public class TestActionScheduler { Properties properties = new Properties(); Configuration conf = new Configuration(properties); - ServerActionManagerImpl serverActionManager = new ServerActionManagerImpl(fsm); - ActionScheduler scheduler =new ActionScheduler(100, 50, db, aq, fsm, 3, - new HostsMap((String) null), serverActionManager, unitOfWork, conf); + ActionScheduler scheduler = new ActionScheduler(100, 50, db, aq, fsm, 3, + new HostsMap((String) null), unitOfWork, conf); ActionManager am = new ActionManager( - 2, 2, aq, fsm, db, new HostsMap((String) null), - serverActionManager, unitOfWork, requestFactory, conf); + 2, 2, aq, fsm, db, new HostsMap((String) null), + unitOfWork, requestFactory, conf); scheduler.doWork(); @@ -1967,14 +2115,13 @@ public class TestActionScheduler { Properties properties = new Properties(); Configuration conf = new Configuration(properties); - ServerActionManagerImpl serverActionManager = new ServerActionManagerImpl(fsm); - ActionScheduler scheduler =new ActionScheduler(100, 50, db, aq, fsm, 3, - new HostsMap((String) null), serverActionManager, unitOfWork, conf); + ActionScheduler scheduler = new ActionScheduler(100, 50, db, aq, fsm, 3, + new HostsMap((String) null), unitOfWork, conf); ActionManager am = new ActionManager( - 2, 2, aq, fsm, db, new HostsMap((String) null), - serverActionManager, unitOfWork, requestFactory, conf); + 2, 2, aq, fsm, db, new HostsMap((String) null), + unitOfWork, requestFactory, conf); // Execution of request 1 http://git-wip-us.apache.org/repos/asf/ambari/blob/3d397dc0/ambari-server/src/test/java/org/apache/ambari/server/agent/TestHeartbeatHandler.java ---------------------------------------------------------------------- diff --git a/ambari-server/src/test/java/org/apache/ambari/server/agent/TestHeartbeatHandler.java b/ambari-server/src/test/java/org/apache/ambari/server/agent/TestHeartbeatHandler.java index 6c64b31..ad2d136 100644 --- a/ambari-server/src/test/java/org/apache/ambari/server/agent/TestHeartbeatHandler.java +++ b/ambari-server/src/test/java/org/apache/ambari/server/agent/TestHeartbeatHandler.java @@ -77,7 +77,6 @@ import org.apache.ambari.server.configuration.Configuration; import org.apache.ambari.server.controller.HostsMap; import org.apache.ambari.server.orm.GuiceJpaInitializer; import org.apache.ambari.server.orm.InMemoryDefaultTestModule; -import org.apache.ambari.server.serveraction.ServerActionManager; import org.apache.ambari.server.state.Alert; import org.apache.ambari.server.state.Cluster; import org.apache.ambari.server.state.Clusters; @@ -723,7 +722,7 @@ public class TestHeartbeatHandler { clusters.addCluster(DummyCluster); ActionDBAccessor db = injector.getInstance(ActionDBAccessorImpl.class); ActionManager am = new ActionManager(5000, 1200000, new ActionQueue(), clusters, db, - new HostsMap((String) null), null, unitOfWork, injector.getInstance(RequestFactory.class), null); + new HostsMap((String) null), unitOfWork, injector.getInstance(RequestFactory.class), null); populateActionDB(db, DummyHostname1); Stage stage = db.getAllStages(requestId).get(0); Assert.assertEquals(stageId, stage.getStageId()); @@ -2109,13 +2108,12 @@ public class TestHeartbeatHandler { private ActionManager getMockActionManager() { ActionQueue actionQueueMock = createNiceMock(ActionQueue.class); Clusters clustersMock = createNiceMock(Clusters.class); - ServerActionManager serverActionManagerMock = createNiceMock(ServerActionManager.class); Configuration configurationMock = createNiceMock(Configuration.class); ActionManager actionManager = createMockBuilder(ActionManager.class). addMockedMethod("getTasks"). withConstructor((long)0, (long)0, actionQueueMock, clustersMock, - actionDBAccessor, new HostsMap((String) null), serverActionManagerMock, unitOfWork, + actionDBAccessor, new HostsMap((String) null), unitOfWork, injector.getInstance(RequestFactory.class), configurationMock). createMock(); return actionManager; http://git-wip-us.apache.org/repos/asf/ambari/blob/3d397dc0/ambari-server/src/test/java/org/apache/ambari/server/controller/AmbariManagementControllerTest.java ---------------------------------------------------------------------- diff --git a/ambari-server/src/test/java/org/apache/ambari/server/controller/AmbariManagementControllerTest.java b/ambari-server/src/test/java/org/apache/ambari/server/controller/AmbariManagementControllerTest.java index 2d898e5..a5b35e5 100644 --- a/ambari-server/src/test/java/org/apache/ambari/server/controller/AmbariManagementControllerTest.java +++ b/ambari-server/src/test/java/org/apache/ambari/server/controller/AmbariManagementControllerTest.java @@ -91,8 +91,6 @@ import org.apache.ambari.server.orm.dao.HostDAO; import org.apache.ambari.server.orm.entities.ExecutionCommandEntity; import org.apache.ambari.server.security.authorization.Users; import org.apache.ambari.server.serveraction.ServerAction; -import org.apache.ambari.server.serveraction.ServerActionManager; -import org.apache.ambari.server.serveraction.ServerActionManagerImpl; import org.apache.ambari.server.state.Cluster; import org.apache.ambari.server.state.Clusters; import org.apache.ambari.server.state.Config; @@ -7138,25 +7136,6 @@ public class AmbariManagementControllerTest { assertEquals(0, response.getCustomCommands().size()); } - @Test - public void testServerActionForUpgradeFinalization() throws AmbariException { - String clusterName = "foo1"; - StackId currentStackId = new StackId("HDP-0.1"); - StackId newStackId = new StackId("HDP-0.2"); - - createCluster(clusterName); - Cluster c = clusters.getCluster(clusterName); - c.setDesiredStackVersion(currentStackId); - Assert.assertTrue(c.getCurrentStackVersion().equals(currentStackId)); - - ServerActionManager serverActionManager = new ServerActionManagerImpl(clusters); - Map<String, String> payload = new HashMap<String, String>(); - payload.put(ServerAction.PayloadName.CLUSTER_NAME, clusterName); - payload.put(ServerAction.PayloadName.CURRENT_STACK_VERSION, newStackId.getStackId()); - serverActionManager.executeAction(ServerAction.Command.FINALIZE_UPGRADE, payload); - Assert.assertTrue(c.getCurrentStackVersion().equals(newStackId)); - } - // disabled as upgrade feature is disabled @Ignore @Test @@ -7485,9 +7464,7 @@ public class AmbariManagementControllerTest { currRoleOrder = expectedTasks.getRoleOrder(command.getRole()); ExecutionCommand execCommand = command.getExecutionCommandWrapper().getExecutionCommand(); Assert.assertTrue( - execCommand.getCommandParams().containsKey(ServerAction.PayloadName.CURRENT_STACK_VERSION)); - Assert.assertTrue( - execCommand.getCommandParams().containsKey(ServerAction.PayloadName.CLUSTER_NAME)); + execCommand.getRoleParams().containsKey(ServerAction.ACTION_NAME)); Assert.assertEquals(RoleCommand.EXECUTE, execCommand.getRoleCommand()); } else { Assert.assertTrue(command.toString(), expectedTasks.isTaskExpected(command.getRole(), command.getHostName())); http://git-wip-us.apache.org/repos/asf/ambari/blob/3d397dc0/ambari-server/src/test/java/org/apache/ambari/server/serveraction/MockServerAction.java ---------------------------------------------------------------------- diff --git a/ambari-server/src/test/java/org/apache/ambari/server/serveraction/MockServerAction.java b/ambari-server/src/test/java/org/apache/ambari/server/serveraction/MockServerAction.java new file mode 100644 index 0000000..ba9a5af --- /dev/null +++ b/ambari-server/src/test/java/org/apache/ambari/server/serveraction/MockServerAction.java @@ -0,0 +1,92 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ambari.server.serveraction; + +import org.apache.ambari.server.AmbariException; +import org.apache.ambari.server.actionmanager.HostRoleStatus; +import org.apache.ambari.server.agent.CommandReport; +import org.apache.ambari.server.agent.ExecutionCommand; + +import java.util.Map; +import java.util.concurrent.ConcurrentMap; + +/** + * The MockServerAction is an implementation of a ServerAction strictly used to testing purposes. + * <p/> + * This class helps to generate several scenarios from success cases to failure cases. The + * force_fail command parameter can be used to generate different failure cases: + * <ul> + * <li>exception + * - Causes the action to fail by throwing an AmbariException</li> + * <li>timeout + * - Causes the action to fail by timing out (the COMMAND_TIMEOUT value must be set to a reasonable + * value)</li> + * </dl> + * + * If not instructed to fail, this implementation will attempt to increment a "data" counter in a + * shared data context - if available. + */ +public class MockServerAction extends AbstractServerAction { + + public static final String PAYLOAD_FORCE_FAIL = "force_fail"; + + @Override + public CommandReport execute(ConcurrentMap<String, Object> requestSharedDataContext) + throws AmbariException, InterruptedException { + + Map<String, String> commandParameters = getCommandParameters(); + + if (commandParameters == null) { + throw new AmbariException("Missing payload"); + } else if ("exception".equalsIgnoreCase(commandParameters.get(PAYLOAD_FORCE_FAIL))) { + throw new AmbariException("Failing execution by request"); + } else if ("report".equalsIgnoreCase(commandParameters.get(PAYLOAD_FORCE_FAIL))) { + return createCommandReport(1, HostRoleStatus.FAILED, null, "Forced fail via command", "Failing execution by request"); + } else { + if ("timeout".equalsIgnoreCase(commandParameters.get(PAYLOAD_FORCE_FAIL))) { + Long timeout; + + try { + timeout = (commandParameters.containsKey(ExecutionCommand.KeyNames.COMMAND_TIMEOUT)) + ? Long.parseLong(commandParameters.get(ExecutionCommand.KeyNames.COMMAND_TIMEOUT)) * 1000 // Convert seconds to milliseconds + : null; + } catch (NumberFormatException e) { + timeout = null; + } + + if (timeout != null) { + Thread.sleep(timeout * 10); + } + } + + // Test updating the shared data context... + if (requestSharedDataContext != null) { + Integer data = (Integer) requestSharedDataContext.get("Data"); + + if (data == null) { + data = 0; + } + + requestSharedDataContext.put("Data", ++data); + } + + return createCommandReport(0, HostRoleStatus.COMPLETED, null, "Success!", null); + } + } +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/ambari/blob/3d397dc0/ambari-server/src/test/java/org/apache/ambari/server/serveraction/ServerActionExecutorTest.java ---------------------------------------------------------------------- diff --git a/ambari-server/src/test/java/org/apache/ambari/server/serveraction/ServerActionExecutorTest.java b/ambari-server/src/test/java/org/apache/ambari/server/serveraction/ServerActionExecutorTest.java new file mode 100644 index 0000000..e89477a --- /dev/null +++ b/ambari-server/src/test/java/org/apache/ambari/server/serveraction/ServerActionExecutorTest.java @@ -0,0 +1,248 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ambari.server.serveraction; + +import org.apache.ambari.server.Role; +import org.apache.ambari.server.RoleCommand; +import org.apache.ambari.server.actionmanager.*; +import org.apache.ambari.server.agent.CommandReport; +import org.apache.ambari.server.agent.ExecutionCommand; +import org.apache.ambari.server.state.svccomphost.ServiceComponentHostServerActionEvent; +import org.apache.ambari.server.utils.StageUtils; +import org.junit.Test; +import org.mockito.invocation.InvocationOnMock; +import org.mockito.stubbing.Answer; + +import java.util.*; + +import static org.junit.Assert.assertEquals; +import static org.mockito.Matchers.*; +import static org.mockito.Mockito.doAnswer; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.when; + +public class ServerActionExecutorTest { + private static final int MAX_CYCLE_ITERATIONS = 1000; + private static final String SERVER_HOST_NAME = StageUtils.getHostName(); + private static final String CLUSTER_HOST_INFO = "{all_hosts=[" + + SERVER_HOST_NAME + "], slave_hosts=[" + + SERVER_HOST_NAME + "]}"; + + /** + * Test a normal server action + */ + @Test + public void testServerAction() throws Exception { + final Request request = createMockRequest(); + final Stage s = getStageWithServerAction(1, 977, null, "test", 300); + final List<Stage> stages = new ArrayList<Stage>() { + { + add(s); + } + }; + ActionDBAccessor db = createMockActionDBAccessor(request, stages); + ServerActionExecutor executor = new ServerActionExecutor(db, 10000); + + // Force the task to be QUEUED + s.getHostRoleCommand(SERVER_HOST_NAME, Role.AMBARI_SERVER_ACTION.toString()).setStatus(HostRoleStatus.QUEUED); + + int cycleCount = 0; + while (!getTaskStatus(s).isCompletedState() && (cycleCount++ <= MAX_CYCLE_ITERATIONS)) { + executor.doWork(); + } + + assertEquals(HostRoleStatus.COMPLETED, getTaskStatus(s)); + } + + + /** + * Test a timeout server action + */ + @Test + public void testServerActionTimeout() throws Exception { + final Request request = createMockRequest(); + final Stage s = getStageWithServerAction(1, + 977, + new HashMap<String, String>() {{ + put(MockServerAction.PAYLOAD_FORCE_FAIL, "timeout"); + }}, + "test", + 1); + final List<Stage> stages = new ArrayList<Stage>() { + { + add(s); + } + }; + ActionDBAccessor db = createMockActionDBAccessor(request, stages); + ServerActionExecutor executor = new ServerActionExecutor(db, 10000); + + // Force the task to be QUEUED + s.getHostRoleCommand(SERVER_HOST_NAME, Role.AMBARI_SERVER_ACTION.toString()).setStatus(HostRoleStatus.QUEUED); + + int cycleCount = 0; + while (!getTaskStatus(s).isCompletedState() && (cycleCount++ <= MAX_CYCLE_ITERATIONS)) { + executor.doWork(); + } + + assertEquals(HostRoleStatus.TIMEDOUT, getTaskStatus(s)); + } + + + /** + * Test a timeout server action + */ + @Test + public void testServerActionFailedException() throws Exception { + final Request request = createMockRequest(); + final Stage s = getStageWithServerAction(1, + 977, + new HashMap<String, String>() {{ + put(MockServerAction.PAYLOAD_FORCE_FAIL, "exception"); + }}, + "test", + 1); + final List<Stage> stages = new ArrayList<Stage>() { + { + add(s); + } + }; + ActionDBAccessor db = createMockActionDBAccessor(request, stages); + ServerActionExecutor executor = new ServerActionExecutor(db, 10000); + + // Force the task to be QUEUED + s.getHostRoleCommand(SERVER_HOST_NAME, Role.AMBARI_SERVER_ACTION.toString()).setStatus(HostRoleStatus.QUEUED); + + int cycleCount = 0; + while (!getTaskStatus(s).isCompletedState() && (cycleCount++ <= MAX_CYCLE_ITERATIONS)) { + executor.doWork(); + } + + assertEquals(HostRoleStatus.FAILED, getTaskStatus(s)); + } + + /** + * Test a timeout server action + */ + @Test + public void testServerActionFailedReport() throws Exception { + final Request request = createMockRequest(); + final Stage s = getStageWithServerAction(1, + 977, + new HashMap<String, String>() {{ + put(MockServerAction.PAYLOAD_FORCE_FAIL, "report"); + }}, + "test", + 1); + final List<Stage> stages = new ArrayList<Stage>() { + { + add(s); + } + }; + ActionDBAccessor db = createMockActionDBAccessor(request, stages); + ServerActionExecutor executor = new ServerActionExecutor(db, 10000); + + // Force the task to be QUEUED + s.getHostRoleCommand(SERVER_HOST_NAME, Role.AMBARI_SERVER_ACTION.toString()).setStatus(HostRoleStatus.QUEUED); + + int cycleCount = 0; + while (!getTaskStatus(s).isCompletedState() && (cycleCount++ <= MAX_CYCLE_ITERATIONS)) { + executor.doWork(); + } + + assertEquals(HostRoleStatus.FAILED, getTaskStatus(s)); + } + + private HostRoleStatus getTaskStatus(List<Stage> stages, int i) { + return getTaskStatus(stages.get(i)); + } + + private HostRoleStatus getTaskStatus(Stage stage) { + return stage.getHostRoleStatus(SERVER_HOST_NAME, "AMBARI_SERVER_ACTION"); + } + + private Request createMockRequest() { + Request request = mock(Request.class); + when(request.isExclusive()).thenReturn(false); + when(request.getRequestId()).thenReturn(1L); + return request; + } + + private ActionDBAccessor createMockActionDBAccessor(final Request request, final List<Stage> stages) { + ActionDBAccessor db = mock(ActionDBAccessor.class); + + when(db.getStagesInProgress()).thenReturn(stages); + doAnswer(new Answer() { + @Override + public Object answer(InvocationOnMock invocation) throws Throwable { + RequestStatus status = (RequestStatus) invocation.getArguments()[0]; + + if (status == RequestStatus.IN_PROGRESS) { + return Arrays.asList(request); + } else { + return Collections.emptyList(); + } + } + }).when(db).getRequestsByStatus(any(RequestStatus.class), anyInt(), anyBoolean()); + + doAnswer(new Answer() { + @Override + public Object answer(InvocationOnMock invocation) throws Throwable { + String host = (String) invocation.getArguments()[0]; + String role = (String) invocation.getArguments()[3]; + CommandReport commandReport = (CommandReport) invocation.getArguments()[4]; + HostRoleCommand command = stages.get(0).getHostRoleCommand(host, role); + command.setStatus(HostRoleStatus.valueOf(commandReport.getStatus())); + return null; + } + }).when(db).updateHostRoleState(anyString(), anyLong(), anyLong(), anyString(), any(CommandReport.class)); + doAnswer(new Answer() { + @Override + public Object answer(InvocationOnMock invocation) throws Throwable { + String host = (String) invocation.getArguments()[0]; + String role = (String) invocation.getArguments()[1]; + HostRoleStatus status = (HostRoleStatus) invocation.getArguments()[2]; + + HostRoleCommand task = stages.get(0).getHostRoleCommand(host, role); + + if (task.getStatus() == status) { + return Arrays.asList(task); + } else { + return null; + } + } + }).when(db).getTasksByHostRoleAndStatus(anyString(), anyString(), any(HostRoleStatus.class)); + + return db; + } + + private static Stage getStageWithServerAction(long requestId, long stageId, + Map<String, String> payload, String requestContext, + int timeout) { + Stage stage = new Stage(requestId, "/tmp", "cluster1", 1L, requestContext, CLUSTER_HOST_INFO, + "{}", "{}"); + + stage.setStageId(stageId); + stage.addServerActionCommand(MockServerAction.class.getName(), Role.AMBARI_SERVER_ACTION, + RoleCommand.EXECUTE, "cluster1", + new ServiceComponentHostServerActionEvent(SERVER_HOST_NAME, System.currentTimeMillis()), + payload, timeout); + + return stage; + } +} \ No newline at end of file