Repository: ambari
Updated Branches:
  refs/heads/trunk cc076cf34 -> 3d397dc04


http://git-wip-us.apache.org/repos/asf/ambari/blob/3d397dc0/ambari-server/src/test/java/org/apache/ambari/server/actionmanager/TestActionScheduler.java
----------------------------------------------------------------------
diff --git 
a/ambari-server/src/test/java/org/apache/ambari/server/actionmanager/TestActionScheduler.java
 
b/ambari-server/src/test/java/org/apache/ambari/server/actionmanager/TestActionScheduler.java
index 7224924..c5c5bde 100644
--- 
a/ambari-server/src/test/java/org/apache/ambari/server/actionmanager/TestActionScheduler.java
+++ 
b/ambari-server/src/test/java/org/apache/ambari/server/actionmanager/TestActionScheduler.java
@@ -25,14 +25,7 @@ import static org.mockito.Matchers.anyLong;
 import static org.mockito.Matchers.anyString;
 import static org.mockito.Mockito.*;
 import java.lang.reflect.Type;
-import java.util.ArrayList;
-import java.util.Collection;
-import java.util.Collections;
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
-import java.util.Properties;
-import java.util.Set;
+import java.util.*;
 import java.util.concurrent.CountDownLatch;
 import java.util.concurrent.TimeUnit;
 
@@ -52,9 +45,7 @@ import org.apache.ambari.server.agent.CommandReport;
 import org.apache.ambari.server.agent.ExecutionCommand;
 import org.apache.ambari.server.configuration.Configuration;
 import org.apache.ambari.server.controller.HostsMap;
-import org.apache.ambari.server.serveraction.ServerAction;
-import org.apache.ambari.server.serveraction.ServerActionManager;
-import org.apache.ambari.server.serveraction.ServerActionManagerImpl;
+import org.apache.ambari.server.serveraction.MockServerAction;
 import org.apache.ambari.server.state.Cluster;
 import org.apache.ambari.server.state.Clusters;
 import org.apache.ambari.server.state.Host;
@@ -65,6 +56,7 @@ import org.apache.ambari.server.state.ServiceComponentHost;
 import org.apache.ambari.server.state.ServiceComponentHostEvent;
 import 
org.apache.ambari.server.state.svccomphost.ServiceComponentHostInstallEvent;
 import 
org.apache.ambari.server.state.svccomphost.ServiceComponentHostOpFailedEvent;
+import 
org.apache.ambari.server.state.svccomphost.ServiceComponentHostServerActionEvent;
 import 
org.apache.ambari.server.state.svccomphost.ServiceComponentHostUpgradeEvent;
 import org.apache.ambari.server.utils.StageUtils;
 import org.easymock.Capture;
@@ -86,6 +78,8 @@ public class TestActionScheduler {
   private static final String CLUSTER_HOST_INFO_UPDATED = 
"{all_hosts=[c6401.ambari.apache.org,"
       + " c6402.ambari.apache.org], slave_hosts=[c6401.ambari.apache.org,"
       + " c6402.ambari.apache.org]}";
+
+  private final String serverHostname = StageUtils.getHostName();
   private final String hostname = "ahost.ambari.apache.org";
   private final int MAX_CYCLE_ITERATIONS = 100;
 
@@ -96,7 +90,7 @@ public class TestActionScheduler {
    */
   @Test
   public void testActionSchedule() throws Exception {
-    
+
     Type type = new TypeToken<Map<String, Set<String>>>() {}.getType();
     Map<String, List<String>> clusterHostInfo = 
StageUtils.getGson().fromJson(CLUSTER_HOST_INFO, type);
 
@@ -116,7 +110,7 @@ public class TestActionScheduler {
     when(serviceObj.getServiceComponent(anyString())).thenReturn(scomp);
     when(scomp.getServiceComponentHost(anyString())).thenReturn(sch);
     when(serviceObj.getCluster()).thenReturn(oneClusterMock);
-    
+
     Host host = mock(Host.class);
     HashMap<String, ServiceComponentHost> hosts =
             new HashMap<String, ServiceComponentHost>();
@@ -141,7 +135,7 @@ public class TestActionScheduler {
     //Keep large number of attempts so that the task is not expired finally
     //Small action timeout to test rescheduling
     ActionScheduler scheduler = new ActionScheduler(100, 5, db, aq, fsm,
-        10000, new HostsMap((String) null), null, unitOfWork, conf);
+        10000, new HostsMap((String) null), unitOfWork, conf);
     scheduler.setTaskTimeoutAdjustment(false);
 
     List<AgentCommand> ac = waitForQueueSize(hostname, aq, 1, scheduler);
@@ -167,7 +161,7 @@ public class TestActionScheduler {
       int expectedQueueSize, ActionScheduler scheduler) {
     int cycleCount = 0;
     while (cycleCount++ <= MAX_CYCLE_ITERATIONS) {
-      List<AgentCommand> ac = aq.dequeueAll(hostname);      
+      List<AgentCommand> ac = aq.dequeueAll(hostname);
       if (ac != null) {
         if (ac.size() == expectedQueueSize) {
           return ac;
@@ -239,7 +233,7 @@ public class TestActionScheduler {
 
     //Small action timeout to test rescheduling
     ActionScheduler scheduler = new ActionScheduler(100, 0, db, aq, fsm, 3,
-        new HostsMap((String) null), null, unitOfWork, conf);
+        new HostsMap((String) null), unitOfWork, conf);
     scheduler.setTaskTimeoutAdjustment(false);
     // Start the thread
 
@@ -305,12 +299,10 @@ public class TestActionScheduler {
       }
     }).when(db).timeoutHostRole(anyString(), anyLong(), anyLong(), 
anyString());
 
-    ServerActionManager sam = 
EasyMock.createNiceMock(ServerActionManager.class);
-
     //Small action timeout to test rescheduling
     ActionScheduler scheduler = 
EasyMock.createMockBuilder(ActionScheduler.class).
         withConstructor((long) 100, (long) 50, db, aq, fsm, 3,
-                new HostsMap((String) null), sam, unitOfWork, conf).
+            new HostsMap((String) null), unitOfWork, conf).
         addMockedMethod("cancelHostRoleCommands").
         createMock();
     
scheduler.cancelHostRoleCommands((Collection<HostRoleCommand>)EasyMock.anyObject(),EasyMock.anyObject(String.class));
@@ -425,7 +417,7 @@ public class TestActionScheduler {
 
     // Make sure the NN install doesn't timeout
     ActionScheduler scheduler = new ActionScheduler(100, 50000, db, aq, fsm, 3,
-      new HostsMap((String) null), null, unitOfWork, conf);
+        new HostsMap((String) null), unitOfWork, conf);
     scheduler.setTaskTimeoutAdjustment(false);
 
     int cycleCount=0;
@@ -485,7 +477,13 @@ public class TestActionScheduler {
     ServiceComponent scomp = mock(ServiceComponent.class);
     ServiceComponentHost sch = mock(ServiceComponentHost.class);
     UnitOfWork unitOfWork = mock(UnitOfWork.class);
+    Host host = mock(Host.class);
+
+    when(host.getHostName()).thenReturn(serverHostname);
+    when(host.getState()).thenReturn(HostState.HEALTHY);
+
     when(fsm.getCluster(anyString())).thenReturn(oneClusterMock);
+    when(fsm.getHost(anyString())).thenReturn(host);
     when(oneClusterMock.getService(anyString())).thenReturn(serviceObj);
     when(serviceObj.getServiceComponent(anyString())).thenReturn(scomp);
     when(scomp.getServiceComponentHost(anyString())).thenReturn(sch);
@@ -493,14 +491,12 @@ public class TestActionScheduler {
 
     HashMap<String, ServiceComponentHost> hosts =
             new HashMap<String, ServiceComponentHost>();
-    hosts.put(hostname, sch);
+    hosts.put(serverHostname, sch);
     when(scomp.getServiceComponentHosts()).thenReturn(hosts);
 
     List<Stage> stages = new ArrayList<Stage>();
     Map<String, String> payload = new HashMap<String, String>();
-    payload.put(ServerAction.PayloadName.CLUSTER_NAME, "cluster1");
-    payload.put(ServerAction.PayloadName.CURRENT_STACK_VERSION, "HDP-0.2");
-    final Stage s = getStageWithServerAction(1, 977, hostname, payload, 
"test");
+    final Stage s = getStageWithServerAction(1, 977, payload, "test", 300);
     stages.add(s);
 
     ActionDBAccessor db = mock(ActionDBAccessor.class);
@@ -522,21 +518,134 @@ public class TestActionScheduler {
       }
     }).when(db).updateHostRoleState(anyString(), anyLong(), anyLong(), 
anyString(), any(CommandReport.class));
 
+    doAnswer(new Answer() {
+      @Override
+      public Object answer(InvocationOnMock invocation) throws Throwable {
+        return s.getHostRoleCommand(serverHostname, "AMBARI_SERVER_ACTION");
+      }
+    }).when(db).getTask(anyLong());
+    doAnswer(new Answer() {
+      @Override
+      public Object answer(InvocationOnMock invocation) throws Throwable {
+        String host = (String) invocation.getArguments()[0];
+        String role = (String) invocation.getArguments()[1];
+        HostRoleStatus status = (HostRoleStatus) invocation.getArguments()[2];
+
+        HostRoleCommand task = s.getHostRoleCommand(host, role);
+
+        if (task.getStatus() == status) {
+          return Arrays.asList(task);
+        } else {
+          return null;
+        }
+      }
+    }).when(db).getTasksByHostRoleAndStatus(anyString(), anyString(), 
any(HostRoleStatus.class));
 
     ActionScheduler scheduler = new ActionScheduler(100, 50, db, aq, fsm, 3,
-        new HostsMap((String) null), new ServerActionManagerImpl(fsm),
-        unitOfWork, conf);
+        new HostsMap((String) null), unitOfWork, conf);
 
-    int cycleCount=0;
-    while (!stages.get(0).getHostRoleStatus(hostname, "AMBARI_SERVER_ACTION")
+    int cycleCount = 0;
+    while (!stages.get(0).getHostRoleStatus(serverHostname, 
"AMBARI_SERVER_ACTION")
         .equals(HostRoleStatus.COMPLETED) && cycleCount++ <= 
MAX_CYCLE_ITERATIONS) {
       scheduler.doWork();
+      scheduler.getServerActionExecutor().doWork();
     }
 
-    assertEquals(stages.get(0).getHostRoleStatus(hostname, 
"AMBARI_SERVER_ACTION"),
+    assertEquals(stages.get(0).getHostRoleStatus(serverHostname, 
"AMBARI_SERVER_ACTION"),
         HostRoleStatus.COMPLETED);
+  }
+
+  /**
+   * Test server action
+   */
+  @Test
+  public void testServerActionTimeOut() throws Exception {
+    ActionQueue aq = new ActionQueue();
+    Properties properties = new Properties();
+    Configuration conf = new Configuration(properties);
+    Clusters fsm = mock(Clusters.class);
+    Cluster oneClusterMock = mock(Cluster.class);
+    Service serviceObj = mock(Service.class);
+    ServiceComponent scomp = mock(ServiceComponent.class);
+    ServiceComponentHost sch = mock(ServiceComponentHost.class);
+    UnitOfWork unitOfWork = mock(UnitOfWork.class);
+    when(fsm.getCluster(anyString())).thenReturn(oneClusterMock);
+    when(oneClusterMock.getService(anyString())).thenReturn(serviceObj);
+    when(serviceObj.getServiceComponent(anyString())).thenReturn(scomp);
+    when(scomp.getServiceComponentHost(anyString())).thenReturn(sch);
+    when(serviceObj.getCluster()).thenReturn(oneClusterMock);
+
+    Host host = mock(Host.class);
+    HashMap<String, ServiceComponentHost> hosts =
+            new HashMap<String, ServiceComponentHost>();
+    hosts.put(serverHostname, sch);
+    when(scomp.getServiceComponentHosts()).thenReturn(hosts);
+
+    when(fsm.getHost(anyString())).thenReturn(host);
+    when(host.getState()).thenReturn(HostState.HEALTHY);
+    when(host.getHostName()).thenReturn(serverHostname);
+
+    List<Stage> stages = new ArrayList<Stage>();
+    Map<String, String> payload = new HashMap<String, String>();
+    payload.put(MockServerAction.PAYLOAD_FORCE_FAIL, "timeout");
+    final Stage s = getStageWithServerAction(1, 977, payload, "test", 2);
+    stages.add(s);
+
+    ActionDBAccessor db = mock(ActionDBAccessor.class);
+
+    Request request = mock(Request.class);
+    when(request.isExclusive()).thenReturn(false);
+    when(db.getRequest(anyLong())).thenReturn(request);
+
+    when(db.getStagesInProgress()).thenReturn(stages);
+    doAnswer(new Answer() {
+      @Override
+      public Object answer(InvocationOnMock invocation) throws Throwable {
+        String host = (String) invocation.getArguments()[0];
+        String role = (String) invocation.getArguments()[3];
+        CommandReport commandReport = (CommandReport) 
invocation.getArguments()[4];
+        HostRoleCommand command = s.getHostRoleCommand(host, role);
+        command.setStatus(HostRoleStatus.valueOf(commandReport.getStatus()));
+        return null;
+      }
+    }).when(db).updateHostRoleState(anyString(), anyLong(), anyLong(), 
anyString(), any(CommandReport.class));
+
+    doAnswer(new Answer() {
+      @Override
+      public Object answer(InvocationOnMock invocation) throws Throwable {
+        return s.getHostRoleCommand(serverHostname, "AMBARI_SERVER_ACTION");
+      }
+    }).when(db).getTask(anyLong());
+    doAnswer(new Answer() {
+      @Override
+      public Object answer(InvocationOnMock invocation) throws Throwable {
+        String host = (String) invocation.getArguments()[0];
+        String role = (String) invocation.getArguments()[1];
+        HostRoleStatus status = (HostRoleStatus) invocation.getArguments()[2];
+
+        HostRoleCommand task = s.getHostRoleCommand(host, role);
+
+        if (task.getStatus() == status) {
+          return Arrays.asList(task);
+        } else {
+          return null;
+        }
+      }
+    }).when(db).getTasksByHostRoleAndStatus(anyString(), anyString(), 
any(HostRoleStatus.class));
 
 
+    ActionScheduler scheduler = new ActionScheduler(100, 50, db, aq, fsm, 3,
+        new HostsMap((String) null), unitOfWork, conf);
+
+    int cycleCount = 0;
+    while (!stages.get(0).getHostRoleStatus(serverHostname, 
"AMBARI_SERVER_ACTION").isCompletedState()
+        && cycleCount++ <= MAX_CYCLE_ITERATIONS) {
+      scheduler.doWork();
+      scheduler.getServerActionExecutor().doWork();
+    }
+
+    assertEquals(HostRoleStatus.TIMEDOUT,
+        stages.get(0).getHostRoleStatus(serverHostname, 
"AMBARI_SERVER_ACTION"));
   }
 
   @Test
@@ -558,13 +667,13 @@ public class TestActionScheduler {
 
     HashMap<String, ServiceComponentHost> hosts =
             new HashMap<String, ServiceComponentHost>();
-    hosts.put(hostname, sch);
+    hosts.put(serverHostname, sch);
     when(scomp.getServiceComponentHosts()).thenReturn(hosts);
 
     List<Stage> stages = new ArrayList<Stage>();
     Map<String, String> payload = new HashMap<String, String>();
-    payload.put(ServerAction.PayloadName.CURRENT_STACK_VERSION, "HDP-0.2");
-    final Stage s = getStageWithServerAction(1, 977, hostname, payload, 
"test");
+    payload.put(MockServerAction.PAYLOAD_FORCE_FAIL, "exception");
+    final Stage s = getStageWithServerAction(1, 977, payload, "test", 300);
     stages.add(s);
 
     ActionDBAccessor db = mock(ActionDBAccessor.class);
@@ -586,34 +695,57 @@ public class TestActionScheduler {
       }
     }).when(db).updateHostRoleState(anyString(), anyLong(), anyLong(), 
anyString(), any(CommandReport.class));
 
+    doAnswer(new Answer() {
+      @Override
+      public Object answer(InvocationOnMock invocation) throws Throwable {
+        return s.getHostRoleCommand(serverHostname, "AMBARI_SERVER_ACTION");
+      }
+    }).when(db).getTask(anyLong());
+    doAnswer(new Answer() {
+      @Override
+      public Object answer(InvocationOnMock invocation) throws Throwable {
+        String host = (String) invocation.getArguments()[0];
+        String role = (String) invocation.getArguments()[1];
+        HostRoleStatus status = (HostRoleStatus) invocation.getArguments()[2];
+
+        HostRoleCommand task = s.getHostRoleCommand(host, role);
+
+        if (task.getStatus() == status) {
+          return Arrays.asList(task);
+        } else {
+          return null;
+        }
+      }
+    }).when(db).getTasksByHostRoleAndStatus(anyString(), anyString(), 
any(HostRoleStatus.class));
 
     ActionScheduler scheduler = new ActionScheduler(100, 50, db, aq, fsm, 3,
-        new HostsMap((String) null), new ServerActionManagerImpl(fsm), 
unitOfWork, conf);
+        new HostsMap((String) null), unitOfWork, conf);
 
     int cycleCount = 0;
-    while (!stages.get(0).getHostRoleStatus(hostname, "AMBARI_SERVER_ACTION")
+    while (!stages.get(0).getHostRoleStatus(serverHostname, 
"AMBARI_SERVER_ACTION")
         .equals(HostRoleStatus.FAILED) && cycleCount++ <= 
MAX_CYCLE_ITERATIONS) {
       scheduler.doWork();
+      scheduler.getServerActionExecutor().doWork();
     }
-    assertEquals(stages.get(0).getHostRoleStatus(hostname, 
"AMBARI_SERVER_ACTION"),
+    assertEquals(stages.get(0).getHostRoleStatus(serverHostname, 
"AMBARI_SERVER_ACTION"),
         HostRoleStatus.FAILED);
     assertEquals("test", stages.get(0).getRequestContext());
   }
 
-  private static Stage getStageWithServerAction(long requestId, long stageId, 
String hostName,
-                                                Map<String, String> payload, 
String requestContext) {
+  private static Stage getStageWithServerAction(long requestId, long stageId,
+                                                Map<String, String> payload, 
String requestContext,
+                                                int timeout) {
+    String serverHostname = StageUtils.getHostName();
     Stage stage = new Stage(requestId, "/tmp", "cluster1", 1L, requestContext, 
CLUSTER_HOST_INFO,
-      "", "");
+      "{}", "{}");
     stage.setStageId(stageId);
-    long now = System.currentTimeMillis();
-    stage.addServerActionCommand(ServerAction.Command.FINALIZE_UPGRADE, 
Role.AMBARI_SERVER_ACTION,
+
+    stage.addServerActionCommand(MockServerAction.class.getName(), 
Role.AMBARI_SERVER_ACTION,
         RoleCommand.EXECUTE, "cluster1",
-        new ServiceComponentHostUpgradeEvent("AMBARI_SERVER_ACTION", hostName, 
now, "HDP-0.2"),
-        hostName);
-    ExecutionCommand execCmd = stage.getExecutionCommandWrapper(hostName,
-        Role.AMBARI_SERVER_ACTION.toString()).getExecutionCommand();
+        new ServiceComponentHostServerActionEvent(serverHostname, 
System.currentTimeMillis()),
+        payload,
+        timeout);
 
-    execCmd.setCommandParams(payload);
     return stage;
   }
 
@@ -666,14 +798,14 @@ public class TestActionScheduler {
                     RoleCommand.START, Service.Type.HDFS, 3, 3, 3));
 
     stages.add(
-            getStageWithSingleTask(
-                    hostname3, "cluster1", Role.DATANODE,
-                    RoleCommand.START, Service.Type.HDFS, 4, 4, 4));
+        getStageWithSingleTask(
+            hostname3, "cluster1", Role.DATANODE,
+            RoleCommand.START, Service.Type.HDFS, 4, 4, 4));
 
     stages.add( // Stage with the same request id, should not be scheduled
-            getStageWithSingleTask(
-                    hostname4, "cluster1", Role.GANGLIA_MONITOR,
-                    RoleCommand.START, Service.Type.GANGLIA, 5, 5, 4));
+        getStageWithSingleTask(
+            hostname4, "cluster1", Role.GANGLIA_MONITOR,
+            RoleCommand.START, Service.Type.GANGLIA, 5, 5, 4));
 
     ActionDBAccessor db = mock(ActionDBAccessor.class);
 
@@ -686,12 +818,11 @@ public class TestActionScheduler {
     Properties properties = new Properties();
     Configuration conf = new Configuration(properties);
     ActionScheduler scheduler = new ActionScheduler(100, 50, db, aq, fsm, 3,
-            new HostsMap((String) null), new ServerActionManagerImpl(fsm),
-            unitOfWork, conf);
+            new HostsMap((String) null), unitOfWork, conf);
 
     ActionManager am = new ActionManager(
-            2, 2, aq, fsm, db, new HostsMap((String) null),
-            new ServerActionManagerImpl(fsm), unitOfWork, requestFactory, 
conf);
+        2, 2, aq, fsm, db, new HostsMap((String) null),
+        unitOfWork, requestFactory, conf);
 
     scheduler.doWork();
 
@@ -740,24 +871,24 @@ public class TestActionScheduler {
                     hostname1, "cluster1", Role.DATANODE,
                     RoleCommand.START, Service.Type.HDFS, 1, 1, 1));
     stages.add( // Stage with the same hostname, should not be scheduled
-            getStageWithSingleTask(
-                    hostname1, "cluster1", Role.GANGLIA_MONITOR,
-                    RoleCommand.START, Service.Type.GANGLIA, 2, 2, 2));
+        getStageWithSingleTask(
+            hostname1, "cluster1", Role.GANGLIA_MONITOR,
+            RoleCommand.START, Service.Type.GANGLIA, 2, 2, 2));
 
     stages.add(
-            getStageWithSingleTask(
-                    hostname2, "cluster1", Role.DATANODE,
-                    RoleCommand.START, Service.Type.HDFS, 3, 3, 3));
+        getStageWithSingleTask(
+            hostname2, "cluster1", Role.DATANODE,
+            RoleCommand.START, Service.Type.HDFS, 3, 3, 3));
 
     stages.add(
-            getStageWithSingleTask(
-                    hostname3, "cluster1", Role.DATANODE,
-                    RoleCommand.START, Service.Type.HDFS, 4, 4, 4));
+        getStageWithSingleTask(
+            hostname3, "cluster1", Role.DATANODE,
+            RoleCommand.START, Service.Type.HDFS, 4, 4, 4));
 
     stages.add( // Stage with the same request id, should not be scheduled
-            getStageWithSingleTask(
-                    hostname4, "cluster1", Role.GANGLIA_MONITOR,
-                    RoleCommand.START, Service.Type.GANGLIA, 5, 5, 4));
+        getStageWithSingleTask(
+            hostname4, "cluster1", Role.GANGLIA_MONITOR,
+            RoleCommand.START, Service.Type.GANGLIA, 5, 5, 4));
 
     ActionDBAccessor db = mock(ActionDBAccessor.class);
 
@@ -771,13 +902,13 @@ public class TestActionScheduler {
     properties.put(Configuration.PARALLEL_STAGE_EXECUTION_KEY, "false");
     Configuration conf = new Configuration(properties);
     ActionScheduler scheduler = new ActionScheduler(100, 50, db, aq, fsm, 3,
-            new HostsMap((String) null), new ServerActionManagerImpl(fsm),
+            new HostsMap((String) null),
             unitOfWork, conf);
 
     ActionManager am = new ActionManager(
-            2, 2, aq, fsm, db, new HostsMap((String) null),
-            new ServerActionManagerImpl(fsm), unitOfWork,
-            requestFactory, conf);
+        2, 2, aq, fsm, db, new HostsMap((String) null),
+        unitOfWork,
+        requestFactory, conf);
 
     scheduler.doWork();
 
@@ -805,7 +936,7 @@ public class TestActionScheduler {
     when(serviceObj.getServiceComponent(anyString())).thenReturn(scomp);
     when(scomp.getServiceComponentHost(anyString())).thenReturn(sch);
     when(serviceObj.getCluster()).thenReturn(oneClusterMock);
-    
+
     String hostname1 = "ahost.ambari.apache.org";
     String hostname2 = "bhost.ambari.apache.org";
     HashMap<String, ServiceComponentHost> hosts =
@@ -813,26 +944,26 @@ public class TestActionScheduler {
     hosts.put(hostname1, sch);
     hosts.put(hostname2, sch);
     when(scomp.getServiceComponentHosts()).thenReturn(hosts);
-    
+
     List<Stage> stages = new ArrayList<Stage>();
     Stage backgroundStage = null;
     stages.add(//stage with background command
         backgroundStage = getStageWithSingleTask(
             hostname1, "cluster1", Role.NAMENODE, RoleCommand.CUSTOM_COMMAND, 
"REBALANCEHDFS", Service.Type.HDFS, 1, 1, 1));
-    
+
     Assert.assertEquals(AgentCommandType.BACKGROUND_EXECUTION_COMMAND 
,backgroundStage.getExecutionCommands(hostname1).get(0).getExecutionCommand().getCommandType());
-    
+
     stages.add( // Stage with the same hostname, should be scheduled
         getStageWithSingleTask(
             hostname1, "cluster1", Role.GANGLIA_MONITOR,
             RoleCommand.START, Service.Type.GANGLIA, 2, 2, 2));
-    
+
     stages.add(
         getStageWithSingleTask(
             hostname2, "cluster1", Role.DATANODE,
             RoleCommand.START, Service.Type.HDFS, 3, 3, 3));
-    
-    
+
+
     ActionDBAccessor db = mock(ActionDBAccessor.class);
 
     Request request = mock(Request.class);
@@ -840,21 +971,21 @@ public class TestActionScheduler {
     when(db.getRequest(anyLong())).thenReturn(request);
 
     when(db.getStagesInProgress()).thenReturn(stages);
-    
+
     Properties properties = new Properties();
     properties.put(Configuration.PARALLEL_STAGE_EXECUTION_KEY, "true");
     Configuration conf = new Configuration(properties);
     ActionScheduler scheduler = new ActionScheduler(100, 50, db, aq, fsm, 3,
-        new HostsMap((String) null), new ServerActionManagerImpl(fsm),
+        new HostsMap((String) null),
         unitOfWork, conf);
-    
+
     ActionManager am = new ActionManager(
         2, 2, aq, fsm, db, new HostsMap((String) null),
-        new ServerActionManagerImpl(fsm), unitOfWork,
+        unitOfWork,
         requestFactory, conf);
-    
+
     scheduler.doWork();
-    
+
     Assert.assertEquals(HostRoleStatus.QUEUED, 
stages.get(0).getHostRoleStatus(hostname1, "NAMENODE"));
     Assert.assertEquals(HostRoleStatus.QUEUED, 
stages.get(2).getHostRoleStatus(hostname2, "DATANODE"));
 
@@ -966,12 +1097,11 @@ public class TestActionScheduler {
 
     Properties properties = new Properties();
     Configuration conf = new Configuration(properties);
-    ServerActionManagerImpl serverActionManager = new 
ServerActionManagerImpl(fsm);
 
     Capture<Collection<HostRoleCommand>> cancelCommandList = new 
Capture<Collection<HostRoleCommand>>();
     ActionScheduler scheduler = 
EasyMock.createMockBuilder(ActionScheduler.class).
         withConstructor((long)100, (long)50, db, aq, fsm, 3,
-          new HostsMap((String) null), serverActionManager,
+          new HostsMap((String) null),
           unitOfWork, conf).
           addMockedMethod("cancelHostRoleCommands").
           createMock();
@@ -981,7 +1111,7 @@ public class TestActionScheduler {
     EasyMock.replay(scheduler);
 
     ActionManager am = new ActionManager(
-        2, 2, aq, fsm, db, new HostsMap((String) null), serverActionManager, 
unitOfWork, requestFactory, conf);
+        2, 2, aq, fsm, db, new HostsMap((String) null), unitOfWork, 
requestFactory, conf);
 
     scheduler.doWork();
 
@@ -1140,10 +1270,10 @@ public class TestActionScheduler {
     Properties properties = new Properties();
     Configuration conf = new Configuration(properties);
     ActionScheduler scheduler = new ActionScheduler(100, 10000, db, aq, fsm, 3,
-        new HostsMap((String) null), new ServerActionManagerImpl(fsm),
+        new HostsMap((String) null),
         unitOfWork, conf);
     ActionManager am = new ActionManager(
-        2, 10000, aq, fsm, db, new HostsMap((String) null), new 
ServerActionManagerImpl(fsm), unitOfWork, requestFactory, conf);
+        2, 10000, aq, fsm, db, new HostsMap((String) null), unitOfWork, 
requestFactory, conf);
 
     scheduler.doWork();
 
@@ -1326,9 +1456,9 @@ public class TestActionScheduler {
     Configuration conf = new Configuration(properties);
     ActionScheduler scheduler = new ActionScheduler(100, 50, db, aq, fsm, 3,
         new HostsMap((String) null),
-        new ServerActionManagerImpl(fsm), unitOfWork, conf);
+        unitOfWork, conf);
     ActionManager am = new ActionManager(
-        2, 2, aq, fsm, db, new HostsMap((String) null), new 
ServerActionManagerImpl(fsm), unitOfWork, requestFactory, conf);
+        2, 2, aq, fsm, db, new HostsMap((String) null), unitOfWork, 
requestFactory, conf);
 
     scheduler.doWork();
 
@@ -1419,7 +1549,7 @@ public class TestActionScheduler {
     assertEquals(new Float(1.0), new Float(s.getSuccessFactor(Role.NAMENODE)));
     assertEquals(new Float(1.0), new 
Float(s.getSuccessFactor(Role.GANGLIA_SERVER)));
   }
-  
+
   @Test
   public void testSuccessCriteria() {
     RoleStats rs1 = new RoleStats(1, (float)0.5);
@@ -1427,37 +1557,37 @@ public class TestActionScheduler {
     assertTrue(rs1.isSuccessFactorMet());
     rs1.numSucceeded = 0;
     assertFalse(rs1.isSuccessFactorMet());
-    
+
     RoleStats rs2 = new RoleStats(2, (float)0.5);
     rs2.numSucceeded = 1;
     assertTrue(rs2.isSuccessFactorMet());
-    
+
     RoleStats rs3 = new RoleStats(3, (float)0.5);
     rs3.numSucceeded = 2;
     assertTrue(rs2.isSuccessFactorMet());
     rs3.numSucceeded = 1;
     assertFalse(rs3.isSuccessFactorMet());
-    
+
     RoleStats rs4 = new RoleStats(3, (float)1.0);
     rs4.numSucceeded = 2;
     assertFalse(rs3.isSuccessFactorMet());
   }
-  
+
   /**
    * This test sends verifies that ActionScheduler returns up-to-date cluster 
host info and caching works correctly.
    */
   @Test
   public void testClusterHostInfoCache() throws Exception {
-    
+
     Type type = new TypeToken<Map<String, Set<String>>>() {}.getType();
-    
+
     //Data for stages
     Map<String, Set<String>> clusterHostInfo1 = 
StageUtils.getGson().fromJson(CLUSTER_HOST_INFO, type);
     Map<String, Set<String>> clusterHostInfo2 = 
StageUtils.getGson().fromJson(CLUSTER_HOST_INFO_UPDATED, type);
     int stageId = 1;
     int requestId1 = 1;
     int requestId2 = 2;
-    
+
     ActionQueue aq = new ActionQueue();
     Properties properties = new Properties();
     Configuration conf = new Configuration(properties);
@@ -1497,19 +1627,19 @@ public class TestActionScheduler {
     //Keep large number of attempts so that the task is not expired finally
     //Small action timeout to test rescheduling
     ActionScheduler scheduler = new ActionScheduler(100, 100, db, aq, fsm,
-        10000, new HostsMap((String) null), null, unitOfWork, conf);
+        10000, new HostsMap((String) null), unitOfWork, conf);
     scheduler.setTaskTimeoutAdjustment(false);
 
     List<AgentCommand> ac = waitForQueueSize(hostname, aq, 1, scheduler);
 
     assertTrue(ac.get(0) instanceof ExecutionCommand);
     assertEquals(String.valueOf(requestId1) + "-" + stageId, 
((ExecutionCommand) (ac.get(0))).getCommandId());
-    
+
     assertEquals(clusterHostInfo1, ((ExecutionCommand) 
(ac.get(0))).getClusterHostInfo());
-    
+
 
     when(db.getStagesInProgress()).thenReturn(Collections.singletonList(s2));
-    
+
     //Verify that ActionSheduler does not return cached value of cluster host 
info for new requestId
     ac = waitForQueueSize(hostname, aq, 1, scheduler);
     assertTrue(ac.get(0) instanceof ExecutionCommand);
@@ -1576,7 +1706,7 @@ public class TestActionScheduler {
     when(db.getStagesInProgress()).thenReturn(stages);
 
     ActionScheduler scheduler = new ActionScheduler(100, 50000, db, aq, fsm, 3,
-            new HostsMap((String) null), null, unitOfWork, conf);
+            new HostsMap((String) null), unitOfWork, conf);
 
     final CountDownLatch abortCalls = new CountDownLatch(2);
 
@@ -1639,15 +1769,13 @@ public class TestActionScheduler {
 
     HashMap<String, ServiceComponentHost> hosts =
             new HashMap<String, ServiceComponentHost>();
-    hosts.put(hostname, sch);
+    hosts.put(serverHostname, sch);
     when(scomp.getServiceComponentHosts()).thenReturn(hosts);
 
     List<Stage> stages = new ArrayList<Stage>();
     Map<String, String> payload = new HashMap<String, String>();
-    payload.put(ServerAction.PayloadName.CLUSTER_NAME, "cluster1");
-    payload.put(ServerAction.PayloadName.CURRENT_STACK_VERSION, "HDP-0.2");
-    final Stage s = getStageWithServerAction(1, 977, hostname, payload, 
"test");
-    
s.getExecutionCommands().get("ahost.ambari.apache.org").get(0).getExecutionCommand().setServiceName(null);
+    final Stage s = getStageWithServerAction(1, 977, payload, "test", 300);
+    
s.getExecutionCommands().get(serverHostname).get(0).getExecutionCommand().setServiceName(null);
     stages.add(s);
 
     ActionDBAccessor db = mock(ActionDBAccessor.class);
@@ -1668,19 +1796,40 @@ public class TestActionScheduler {
         return null;
       }
     }).when(db).updateHostRoleState(anyString(), anyLong(), anyLong(), 
anyString(), any(CommandReport.class));
+    doAnswer(new Answer() {
+      @Override
+      public Object answer(InvocationOnMock invocation) throws Throwable {
+        String host = (String) invocation.getArguments()[0];
+        String role = (String) invocation.getArguments()[1];
+        HostRoleStatus status = (HostRoleStatus) invocation.getArguments()[2];
 
+        HostRoleCommand task = s.getHostRoleCommand(host, role);
+
+        if (task.getStatus() == status) {
+          return Arrays.asList(task);
+        } else {
+          return null;
+        }
+      }
+    }).when(db).getTasksByHostRoleAndStatus(anyString(), anyString(), 
any(HostRoleStatus.class));
+    doAnswer(new Answer() {
+      @Override
+      public Object answer(InvocationOnMock invocation) throws Throwable {
+        return s.getHostRoleCommand(serverHostname, "AMBARI_SERVER_ACTION");
+      }
+    }).when(db).getTask(anyLong());
 
     ActionScheduler scheduler = new ActionScheduler(100, 50, db, aq, fsm, 3,
-            new HostsMap((String) null), new ServerActionManagerImpl(fsm),
-            unitOfWork, conf);
+        new HostsMap((String) null), unitOfWork, conf);
 
     int cycleCount = 0;
-    while (!stages.get(0).getHostRoleStatus(hostname, "AMBARI_SERVER_ACTION")
-            .equals(HostRoleStatus.COMPLETED) && cycleCount <= 
MAX_CYCLE_ITERATIONS) {
+    while (!stages.get(0).getHostRoleStatus(serverHostname, 
"AMBARI_SERVER_ACTION")
+        .equals(HostRoleStatus.COMPLETED) && cycleCount++ <= 
MAX_CYCLE_ITERATIONS) {
       scheduler.doWork();
+      scheduler.getServerActionExecutor().doWork();
     }
 
-    assertEquals(stages.get(0).getHostRoleStatus(hostname, 
"AMBARI_SERVER_ACTION"),
+    assertEquals(stages.get(0).getHostRoleStatus(serverHostname, 
"AMBARI_SERVER_ACTION"),
             HostRoleStatus.COMPLETED);
   }
 
@@ -1800,14 +1949,13 @@ public class TestActionScheduler {
 
     Properties properties = new Properties();
     Configuration conf = new Configuration(properties);
-    ServerActionManagerImpl serverActionManager = new 
ServerActionManagerImpl(fsm);
 
-    ActionScheduler scheduler =new ActionScheduler(100, 50, db, aq, fsm, 3,
-                    new HostsMap((String) null), serverActionManager, 
unitOfWork, conf);
+    ActionScheduler scheduler = new ActionScheduler(100, 50, db, aq, fsm, 3,
+        new HostsMap((String) null), unitOfWork, conf);
 
     ActionManager am = new ActionManager(
-            2, 2, aq, fsm, db, new HostsMap((String) null),
-            serverActionManager, unitOfWork, requestFactory, conf);
+        2, 2, aq, fsm, db, new HostsMap((String) null),
+        unitOfWork, requestFactory, conf);
 
     scheduler.doWork();
 
@@ -1967,14 +2115,13 @@ public class TestActionScheduler {
 
     Properties properties = new Properties();
     Configuration conf = new Configuration(properties);
-    ServerActionManagerImpl serverActionManager = new 
ServerActionManagerImpl(fsm);
 
-    ActionScheduler scheduler =new ActionScheduler(100, 50, db, aq, fsm, 3,
-            new HostsMap((String) null), serverActionManager, unitOfWork, 
conf);
+    ActionScheduler scheduler = new ActionScheduler(100, 50, db, aq, fsm, 3,
+        new HostsMap((String) null), unitOfWork, conf);
 
     ActionManager am = new ActionManager(
-            2, 2, aq, fsm, db, new HostsMap((String) null),
-            serverActionManager, unitOfWork, requestFactory, conf);
+        2, 2, aq, fsm, db, new HostsMap((String) null),
+        unitOfWork, requestFactory, conf);
 
     // Execution of request 1
 

http://git-wip-us.apache.org/repos/asf/ambari/blob/3d397dc0/ambari-server/src/test/java/org/apache/ambari/server/agent/TestHeartbeatHandler.java
----------------------------------------------------------------------
diff --git 
a/ambari-server/src/test/java/org/apache/ambari/server/agent/TestHeartbeatHandler.java
 
b/ambari-server/src/test/java/org/apache/ambari/server/agent/TestHeartbeatHandler.java
index 6c64b31..ad2d136 100644
--- 
a/ambari-server/src/test/java/org/apache/ambari/server/agent/TestHeartbeatHandler.java
+++ 
b/ambari-server/src/test/java/org/apache/ambari/server/agent/TestHeartbeatHandler.java
@@ -77,7 +77,6 @@ import org.apache.ambari.server.configuration.Configuration;
 import org.apache.ambari.server.controller.HostsMap;
 import org.apache.ambari.server.orm.GuiceJpaInitializer;
 import org.apache.ambari.server.orm.InMemoryDefaultTestModule;
-import org.apache.ambari.server.serveraction.ServerActionManager;
 import org.apache.ambari.server.state.Alert;
 import org.apache.ambari.server.state.Cluster;
 import org.apache.ambari.server.state.Clusters;
@@ -723,7 +722,7 @@ public class TestHeartbeatHandler {
     clusters.addCluster(DummyCluster);
     ActionDBAccessor db = injector.getInstance(ActionDBAccessorImpl.class);
     ActionManager am = new ActionManager(5000, 1200000, new ActionQueue(), 
clusters, db,
-        new HostsMap((String) null), null, unitOfWork, 
injector.getInstance(RequestFactory.class), null);
+        new HostsMap((String) null), unitOfWork, 
injector.getInstance(RequestFactory.class), null);
     populateActionDB(db, DummyHostname1);
     Stage stage = db.getAllStages(requestId).get(0);
     Assert.assertEquals(stageId, stage.getStageId());
@@ -2109,13 +2108,12 @@ public class TestHeartbeatHandler {
   private ActionManager getMockActionManager() {
     ActionQueue actionQueueMock = createNiceMock(ActionQueue.class);
     Clusters clustersMock = createNiceMock(Clusters.class);
-    ServerActionManager serverActionManagerMock = 
createNiceMock(ServerActionManager.class);
     Configuration configurationMock = createNiceMock(Configuration.class);
 
     ActionManager actionManager = createMockBuilder(ActionManager.class).
             addMockedMethod("getTasks").
             withConstructor((long)0, (long)0, actionQueueMock, clustersMock,
-                    actionDBAccessor, new HostsMap((String) null), 
serverActionManagerMock, unitOfWork,
+                    actionDBAccessor, new HostsMap((String) null), unitOfWork,
                     injector.getInstance(RequestFactory.class), 
configurationMock).
             createMock();
     return actionManager;

http://git-wip-us.apache.org/repos/asf/ambari/blob/3d397dc0/ambari-server/src/test/java/org/apache/ambari/server/controller/AmbariManagementControllerTest.java
----------------------------------------------------------------------
diff --git 
a/ambari-server/src/test/java/org/apache/ambari/server/controller/AmbariManagementControllerTest.java
 
b/ambari-server/src/test/java/org/apache/ambari/server/controller/AmbariManagementControllerTest.java
index 2d898e5..a5b35e5 100644
--- 
a/ambari-server/src/test/java/org/apache/ambari/server/controller/AmbariManagementControllerTest.java
+++ 
b/ambari-server/src/test/java/org/apache/ambari/server/controller/AmbariManagementControllerTest.java
@@ -91,8 +91,6 @@ import org.apache.ambari.server.orm.dao.HostDAO;
 import org.apache.ambari.server.orm.entities.ExecutionCommandEntity;
 import org.apache.ambari.server.security.authorization.Users;
 import org.apache.ambari.server.serveraction.ServerAction;
-import org.apache.ambari.server.serveraction.ServerActionManager;
-import org.apache.ambari.server.serveraction.ServerActionManagerImpl;
 import org.apache.ambari.server.state.Cluster;
 import org.apache.ambari.server.state.Clusters;
 import org.apache.ambari.server.state.Config;
@@ -7138,25 +7136,6 @@ public class AmbariManagementControllerTest {
     assertEquals(0, response.getCustomCommands().size());
   }
 
-  @Test
-  public void testServerActionForUpgradeFinalization() throws AmbariException {
-    String clusterName = "foo1";
-    StackId currentStackId = new StackId("HDP-0.1");
-    StackId newStackId = new StackId("HDP-0.2");
-
-    createCluster(clusterName);
-    Cluster c = clusters.getCluster(clusterName);
-    c.setDesiredStackVersion(currentStackId);
-    Assert.assertTrue(c.getCurrentStackVersion().equals(currentStackId));
-
-    ServerActionManager serverActionManager = new 
ServerActionManagerImpl(clusters);
-    Map<String, String> payload = new HashMap<String, String>();
-    payload.put(ServerAction.PayloadName.CLUSTER_NAME, clusterName);
-    payload.put(ServerAction.PayloadName.CURRENT_STACK_VERSION, 
newStackId.getStackId());
-    serverActionManager.executeAction(ServerAction.Command.FINALIZE_UPGRADE, 
payload);
-    Assert.assertTrue(c.getCurrentStackVersion().equals(newStackId));
-  }
-
   // disabled as upgrade feature is disabled
   @Ignore
   @Test
@@ -7485,9 +7464,7 @@ public class AmbariManagementControllerTest {
           currRoleOrder = expectedTasks.getRoleOrder(command.getRole());
           ExecutionCommand execCommand = 
command.getExecutionCommandWrapper().getExecutionCommand();
           Assert.assertTrue(
-              
execCommand.getCommandParams().containsKey(ServerAction.PayloadName.CURRENT_STACK_VERSION));
-          Assert.assertTrue(
-              
execCommand.getCommandParams().containsKey(ServerAction.PayloadName.CLUSTER_NAME));
+              
execCommand.getRoleParams().containsKey(ServerAction.ACTION_NAME));
           Assert.assertEquals(RoleCommand.EXECUTE, 
execCommand.getRoleCommand());
         } else {
           Assert.assertTrue(command.toString(), 
expectedTasks.isTaskExpected(command.getRole(), command.getHostName()));

http://git-wip-us.apache.org/repos/asf/ambari/blob/3d397dc0/ambari-server/src/test/java/org/apache/ambari/server/serveraction/MockServerAction.java
----------------------------------------------------------------------
diff --git 
a/ambari-server/src/test/java/org/apache/ambari/server/serveraction/MockServerAction.java
 
b/ambari-server/src/test/java/org/apache/ambari/server/serveraction/MockServerAction.java
new file mode 100644
index 0000000..ba9a5af
--- /dev/null
+++ 
b/ambari-server/src/test/java/org/apache/ambari/server/serveraction/MockServerAction.java
@@ -0,0 +1,92 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ambari.server.serveraction;
+
+import org.apache.ambari.server.AmbariException;
+import org.apache.ambari.server.actionmanager.HostRoleStatus;
+import org.apache.ambari.server.agent.CommandReport;
+import org.apache.ambari.server.agent.ExecutionCommand;
+
+import java.util.Map;
+import java.util.concurrent.ConcurrentMap;
+
+/**
+ * The MockServerAction is an implementation of a ServerAction strictly used 
to testing purposes.
+ * <p/>
+ * This class helps to generate several scenarios from success cases to 
failure cases.  The
+ * force_fail command parameter can be used to generate different failure 
cases:
+ * <ul>
+ * <li>exception
+ * - Causes the action to fail by throwing an AmbariException</li>
+ * <li>timeout
+ * - Causes the action to fail by timing out (the COMMAND_TIMEOUT value must 
be set to a reasonable
+ * value)</li>
+ * </dl>
+ *
+ * If not instructed to fail, this implementation will attempt to increment a 
"data" counter in a
+ * shared data context - if available.
+ */
+public class MockServerAction extends AbstractServerAction {
+
+  public static final String PAYLOAD_FORCE_FAIL = "force_fail";
+
+  @Override
+  public CommandReport execute(ConcurrentMap<String, Object> 
requestSharedDataContext)
+      throws AmbariException, InterruptedException {
+
+    Map<String, String> commandParameters = getCommandParameters();
+
+    if (commandParameters == null) {
+      throw new AmbariException("Missing payload");
+    } else if 
("exception".equalsIgnoreCase(commandParameters.get(PAYLOAD_FORCE_FAIL))) {
+      throw new AmbariException("Failing execution by request");
+    } else if 
("report".equalsIgnoreCase(commandParameters.get(PAYLOAD_FORCE_FAIL))) {
+      return createCommandReport(1, HostRoleStatus.FAILED, null, "Forced fail 
via command", "Failing execution by request");
+    } else {
+      if 
("timeout".equalsIgnoreCase(commandParameters.get(PAYLOAD_FORCE_FAIL))) {
+        Long timeout;
+
+        try {
+          timeout = 
(commandParameters.containsKey(ExecutionCommand.KeyNames.COMMAND_TIMEOUT))
+              ? 
Long.parseLong(commandParameters.get(ExecutionCommand.KeyNames.COMMAND_TIMEOUT))
 * 1000 // Convert seconds to milliseconds
+              : null;
+        } catch (NumberFormatException e) {
+          timeout = null;
+        }
+
+        if (timeout != null) {
+          Thread.sleep(timeout * 10);
+        }
+      }
+
+      // Test updating the shared data context...
+      if (requestSharedDataContext != null) {
+        Integer data = (Integer) requestSharedDataContext.get("Data");
+
+        if (data == null) {
+          data = 0;
+        }
+
+        requestSharedDataContext.put("Data", ++data);
+      }
+
+      return createCommandReport(0, HostRoleStatus.COMPLETED, null, 
"Success!", null);
+    }
+  }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/ambari/blob/3d397dc0/ambari-server/src/test/java/org/apache/ambari/server/serveraction/ServerActionExecutorTest.java
----------------------------------------------------------------------
diff --git 
a/ambari-server/src/test/java/org/apache/ambari/server/serveraction/ServerActionExecutorTest.java
 
b/ambari-server/src/test/java/org/apache/ambari/server/serveraction/ServerActionExecutorTest.java
new file mode 100644
index 0000000..e89477a
--- /dev/null
+++ 
b/ambari-server/src/test/java/org/apache/ambari/server/serveraction/ServerActionExecutorTest.java
@@ -0,0 +1,248 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ambari.server.serveraction;
+
+import org.apache.ambari.server.Role;
+import org.apache.ambari.server.RoleCommand;
+import org.apache.ambari.server.actionmanager.*;
+import org.apache.ambari.server.agent.CommandReport;
+import org.apache.ambari.server.agent.ExecutionCommand;
+import 
org.apache.ambari.server.state.svccomphost.ServiceComponentHostServerActionEvent;
+import org.apache.ambari.server.utils.StageUtils;
+import org.junit.Test;
+import org.mockito.invocation.InvocationOnMock;
+import org.mockito.stubbing.Answer;
+
+import java.util.*;
+
+import static org.junit.Assert.assertEquals;
+import static org.mockito.Matchers.*;
+import static org.mockito.Mockito.doAnswer;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.when;
+
+public class ServerActionExecutorTest {
+  private static final int MAX_CYCLE_ITERATIONS = 1000;
+  private static final String SERVER_HOST_NAME = StageUtils.getHostName();
+  private static final String CLUSTER_HOST_INFO = "{all_hosts=["
+      + SERVER_HOST_NAME + "], slave_hosts=["
+      + SERVER_HOST_NAME + "]}";
+
+  /**
+   * Test a normal server action
+   */
+  @Test
+  public void testServerAction() throws Exception {
+    final Request request = createMockRequest();
+    final Stage s = getStageWithServerAction(1, 977, null, "test", 300);
+    final List<Stage> stages = new ArrayList<Stage>() {
+      {
+        add(s);
+      }
+    };
+    ActionDBAccessor db = createMockActionDBAccessor(request, stages);
+    ServerActionExecutor executor = new ServerActionExecutor(db, 10000);
+
+    // Force the task to be QUEUED
+    s.getHostRoleCommand(SERVER_HOST_NAME, 
Role.AMBARI_SERVER_ACTION.toString()).setStatus(HostRoleStatus.QUEUED);
+
+    int cycleCount = 0;
+    while (!getTaskStatus(s).isCompletedState() && (cycleCount++ <= 
MAX_CYCLE_ITERATIONS)) {
+      executor.doWork();
+    }
+
+    assertEquals(HostRoleStatus.COMPLETED, getTaskStatus(s));
+  }
+
+
+  /**
+   * Test a timeout server action
+   */
+  @Test
+  public void testServerActionTimeout() throws Exception {
+    final Request request = createMockRequest();
+    final Stage s = getStageWithServerAction(1,
+        977,
+        new HashMap<String, String>() {{
+          put(MockServerAction.PAYLOAD_FORCE_FAIL, "timeout");
+        }},
+        "test",
+        1);
+    final List<Stage> stages = new ArrayList<Stage>() {
+      {
+        add(s);
+      }
+    };
+    ActionDBAccessor db = createMockActionDBAccessor(request, stages);
+    ServerActionExecutor executor = new ServerActionExecutor(db, 10000);
+
+    // Force the task to be QUEUED
+    s.getHostRoleCommand(SERVER_HOST_NAME, 
Role.AMBARI_SERVER_ACTION.toString()).setStatus(HostRoleStatus.QUEUED);
+
+    int cycleCount = 0;
+    while (!getTaskStatus(s).isCompletedState() && (cycleCount++ <= 
MAX_CYCLE_ITERATIONS)) {
+      executor.doWork();
+    }
+
+    assertEquals(HostRoleStatus.TIMEDOUT, getTaskStatus(s));
+  }
+
+
+  /**
+   * Test a timeout server action
+   */
+  @Test
+  public void testServerActionFailedException() throws Exception {
+    final Request request = createMockRequest();
+    final Stage s = getStageWithServerAction(1,
+        977,
+        new HashMap<String, String>() {{
+          put(MockServerAction.PAYLOAD_FORCE_FAIL, "exception");
+        }},
+        "test",
+        1);
+    final List<Stage> stages = new ArrayList<Stage>() {
+      {
+        add(s);
+      }
+    };
+    ActionDBAccessor db = createMockActionDBAccessor(request, stages);
+    ServerActionExecutor executor = new ServerActionExecutor(db, 10000);
+
+    // Force the task to be QUEUED
+    s.getHostRoleCommand(SERVER_HOST_NAME, 
Role.AMBARI_SERVER_ACTION.toString()).setStatus(HostRoleStatus.QUEUED);
+
+    int cycleCount = 0;
+    while (!getTaskStatus(s).isCompletedState() && (cycleCount++ <= 
MAX_CYCLE_ITERATIONS)) {
+      executor.doWork();
+    }
+
+    assertEquals(HostRoleStatus.FAILED, getTaskStatus(s));
+  }
+
+  /**
+   * Test a timeout server action
+   */
+  @Test
+  public void testServerActionFailedReport() throws Exception {
+    final Request request = createMockRequest();
+    final Stage s = getStageWithServerAction(1,
+        977,
+        new HashMap<String, String>() {{
+          put(MockServerAction.PAYLOAD_FORCE_FAIL, "report");
+        }},
+        "test",
+        1);
+    final List<Stage> stages = new ArrayList<Stage>() {
+      {
+        add(s);
+      }
+    };
+    ActionDBAccessor db = createMockActionDBAccessor(request, stages);
+    ServerActionExecutor executor = new ServerActionExecutor(db, 10000);
+
+    // Force the task to be QUEUED
+    s.getHostRoleCommand(SERVER_HOST_NAME, 
Role.AMBARI_SERVER_ACTION.toString()).setStatus(HostRoleStatus.QUEUED);
+
+    int cycleCount = 0;
+    while (!getTaskStatus(s).isCompletedState() && (cycleCount++ <= 
MAX_CYCLE_ITERATIONS)) {
+      executor.doWork();
+    }
+
+    assertEquals(HostRoleStatus.FAILED, getTaskStatus(s));
+  }
+
+  private HostRoleStatus getTaskStatus(List<Stage> stages, int i) {
+    return getTaskStatus(stages.get(i));
+  }
+
+  private HostRoleStatus getTaskStatus(Stage stage) {
+    return stage.getHostRoleStatus(SERVER_HOST_NAME, "AMBARI_SERVER_ACTION");
+  }
+
+  private Request createMockRequest() {
+    Request request = mock(Request.class);
+    when(request.isExclusive()).thenReturn(false);
+    when(request.getRequestId()).thenReturn(1L);
+    return request;
+  }
+
+  private ActionDBAccessor createMockActionDBAccessor(final Request request, 
final List<Stage> stages) {
+    ActionDBAccessor db = mock(ActionDBAccessor.class);
+
+    when(db.getStagesInProgress()).thenReturn(stages);
+    doAnswer(new Answer() {
+      @Override
+      public Object answer(InvocationOnMock invocation) throws Throwable {
+        RequestStatus status = (RequestStatus) invocation.getArguments()[0];
+
+        if (status == RequestStatus.IN_PROGRESS) {
+          return Arrays.asList(request);
+        } else {
+          return Collections.emptyList();
+        }
+      }
+    }).when(db).getRequestsByStatus(any(RequestStatus.class), anyInt(), 
anyBoolean());
+
+    doAnswer(new Answer() {
+      @Override
+      public Object answer(InvocationOnMock invocation) throws Throwable {
+        String host = (String) invocation.getArguments()[0];
+        String role = (String) invocation.getArguments()[3];
+        CommandReport commandReport = (CommandReport) 
invocation.getArguments()[4];
+        HostRoleCommand command = stages.get(0).getHostRoleCommand(host, role);
+        command.setStatus(HostRoleStatus.valueOf(commandReport.getStatus()));
+        return null;
+      }
+    }).when(db).updateHostRoleState(anyString(), anyLong(), anyLong(), 
anyString(), any(CommandReport.class));
+    doAnswer(new Answer() {
+      @Override
+      public Object answer(InvocationOnMock invocation) throws Throwable {
+        String host = (String) invocation.getArguments()[0];
+        String role = (String) invocation.getArguments()[1];
+        HostRoleStatus status = (HostRoleStatus) invocation.getArguments()[2];
+
+        HostRoleCommand task = stages.get(0).getHostRoleCommand(host, role);
+
+        if (task.getStatus() == status) {
+          return Arrays.asList(task);
+        } else {
+          return null;
+        }
+      }
+    }).when(db).getTasksByHostRoleAndStatus(anyString(), anyString(), 
any(HostRoleStatus.class));
+
+    return db;
+  }
+
+  private static Stage getStageWithServerAction(long requestId, long stageId,
+                                                Map<String, String> payload, 
String requestContext,
+                                                int timeout) {
+    Stage stage = new Stage(requestId, "/tmp", "cluster1", 1L, requestContext, 
CLUSTER_HOST_INFO,
+        "{}", "{}");
+
+    stage.setStageId(stageId);
+    stage.addServerActionCommand(MockServerAction.class.getName(), 
Role.AMBARI_SERVER_ACTION,
+        RoleCommand.EXECUTE, "cluster1",
+        new ServiceComponentHostServerActionEvent(SERVER_HOST_NAME, 
System.currentTimeMillis()),
+        payload, timeout);
+
+    return stage;
+  }
+}
\ No newline at end of file

Reply via email to