Repository: ambari
Updated Branches:
  refs/heads/trunk 7bdfd3b54 -> be25c9e77


AMBARI-15671. On Ambari Agent restart currently running commands on that agent 
should be immediately aborted. (mpapirkovskyy)


Project: http://git-wip-us.apache.org/repos/asf/ambari/repo
Commit: http://git-wip-us.apache.org/repos/asf/ambari/commit/be25c9e7
Tree: http://git-wip-us.apache.org/repos/asf/ambari/tree/be25c9e7
Diff: http://git-wip-us.apache.org/repos/asf/ambari/diff/be25c9e7

Branch: refs/heads/trunk
Commit: be25c9e77197be8e056c1bb9aa3651c16dd6fd62
Parents: 7bdfd3b
Author: Myroslav Papirkovskyi <mpapyrkovs...@hortonworks.com>
Authored: Thu Apr 28 18:52:40 2016 +0300
Committer: Myroslav Papirkovskyi <mpapyrkovs...@hortonworks.com>
Committed: Thu May 12 18:40:27 2016 +0300

----------------------------------------------------------------------
 .../server/actionmanager/ActionDBAccessor.java  |   6 +
 .../actionmanager/ActionDBAccessorImpl.java     |  15 +-
 .../server/actionmanager/ActionScheduler.java   |  31 +++-
 .../actionmanager/TestActionScheduler.java      | 151 ++++++++++++++++---
 4 files changed, 177 insertions(+), 26 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/ambari/blob/be25c9e7/ambari-server/src/main/java/org/apache/ambari/server/actionmanager/ActionDBAccessor.java
----------------------------------------------------------------------
diff --git 
a/ambari-server/src/main/java/org/apache/ambari/server/actionmanager/ActionDBAccessor.java
 
b/ambari-server/src/main/java/org/apache/ambari/server/actionmanager/ActionDBAccessor.java
index 9aba4c9..dcfe359 100644
--- 
a/ambari-server/src/main/java/org/apache/ambari/server/actionmanager/ActionDBAccessor.java
+++ 
b/ambari-server/src/main/java/org/apache/ambari/server/actionmanager/ActionDBAccessor.java
@@ -65,6 +65,12 @@ public interface ActionDBAccessor {
   public void timeoutHostRole(String host, long requestId, long stageId, 
String role);
 
   /**
+   * Mark the task as to have timed out
+   */
+  void timeoutHostRole(String host, long requestId, long stageId,
+                       String role, boolean skipSupported);
+
+  /**
    * Returns all the pending stages, including queued and not-queued. A stage 
is
    * considered in progress if it is in progress for any host.
    * <p/>

http://git-wip-us.apache.org/repos/asf/ambari/blob/be25c9e7/ambari-server/src/main/java/org/apache/ambari/server/actionmanager/ActionDBAccessorImpl.java
----------------------------------------------------------------------
diff --git 
a/ambari-server/src/main/java/org/apache/ambari/server/actionmanager/ActionDBAccessorImpl.java
 
b/ambari-server/src/main/java/org/apache/ambari/server/actionmanager/ActionDBAccessorImpl.java
index 06311c2..8e6fb3f 100644
--- 
a/ambari-server/src/main/java/org/apache/ambari/server/actionmanager/ActionDBAccessorImpl.java
+++ 
b/ambari-server/src/main/java/org/apache/ambari/server/actionmanager/ActionDBAccessorImpl.java
@@ -236,11 +236,22 @@ public class ActionDBAccessorImpl implements 
ActionDBAccessor {
   @Override
   public void timeoutHostRole(String host, long requestId, long stageId,
                               String role) {
+    timeoutHostRole(host, requestId, stageId, role, false);
+  }
+
+  @Override
+  public void timeoutHostRole(String host, long requestId, long stageId,
+                              String role, boolean skipSupported) {
     long now = System.currentTimeMillis();
     List<HostRoleCommandEntity> commands =
-        hostRoleCommandDAO.findByHostRole(host, requestId, stageId, role);
+            hostRoleCommandDAO.findByHostRole(host, requestId, stageId, role);
     for (HostRoleCommandEntity command : commands) {
-      command.setStatus(command.isRetryAllowed() ? 
HostRoleStatus.HOLDING_TIMEDOUT : HostRoleStatus.TIMEDOUT);
+      if (skipSupported) {
+        command.setStatus(HostRoleStatus.SKIPPED_FAILED);
+      } else {
+        command.setStatus(command.isRetryAllowed() ? 
HostRoleStatus.HOLDING_TIMEDOUT : HostRoleStatus.TIMEDOUT);
+      }
+
       command.setEndTime(now);
 
       auditLog(command, requestId);

http://git-wip-us.apache.org/repos/asf/ambari/blob/be25c9e7/ambari-server/src/main/java/org/apache/ambari/server/actionmanager/ActionScheduler.java
----------------------------------------------------------------------
diff --git 
a/ambari-server/src/main/java/org/apache/ambari/server/actionmanager/ActionScheduler.java
 
b/ambari-server/src/main/java/org/apache/ambari/server/actionmanager/ActionScheduler.java
index 95d1763..33c0a1f 100644
--- 
a/ambari-server/src/main/java/org/apache/ambari/server/actionmanager/ActionScheduler.java
+++ 
b/ambari-server/src/main/java/org/apache/ambari/server/actionmanager/ActionScheduler.java
@@ -590,7 +590,7 @@ class ActionScheduler implements Runnable {
    * @return the stats for the roles in the stage which are used to determine
    * whether stage has succeeded or failed
    */
-  private Map<String, RoleStats> processInProgressStage(Stage s,
+  protected Map<String, RoleStats> processInProgressStage(Stage s,
       List<ExecutionCommand> commandsToSchedule) throws AmbariException {
     LOG.debug("==> Collecting commands to schedule...");
     // Map to track role status
@@ -694,12 +694,17 @@ class ActionScheduler implements Runnable {
           LOG.info("Host:" + host + ", role:" + roleStr + ", actionId:" + 
s.getActionId() + " timed out");
           if (s.getAttemptCount(host, roleStr) >= maxAttempts) {
             LOG.warn("Host:" + host + ", role:" + roleStr + ", actionId:" + 
s.getActionId() + " expired");
-            db.timeoutHostRole(host, s.getRequestId(), s.getStageId(), 
c.getRole());
+            db.timeoutHostRole(host, s.getRequestId(), s.getStageId(), 
c.getRole(), s.isAutoSkipOnFailureSupported());
             //Reinitialize status
             status = s.getHostRoleStatus(host, roleStr);
 
             if (null != cluster) {
-              transitionToFailedState(cluster.getClusterName(), 
c.getServiceName(), roleStr, host, now, false);
+              if (!RoleCommand.CUSTOM_COMMAND.equals(c.getRoleCommand())
+                && !RoleCommand.SERVICE_CHECK.equals(c.getRoleCommand())
+                && !RoleCommand.ACTIONEXECUTE.equals(c.getRoleCommand())) {
+                //commands above don't affect host component state (e.g. no 
in_progress state in process), transition will fail
+                transitionToFailedState(cluster.getClusterName(), 
c.getServiceName(), roleStr, host, now, false);
+              }
               if (c.getRoleCommand().equals(RoleCommand.ACTIONEXECUTE)) {
                 processActionDeath(cluster.getClusterName(), c.getHostname(), 
roleStr);
               }
@@ -833,6 +838,19 @@ class ActionScheduler implements Runnable {
   }
 
   /**
+   * Checks if ambari-agent was restarted during role command execution
+   * @param host the host with ambari-agent to check
+   * @param stage the stage
+   * @param role the role to check
+   * @return {@code true} if ambari-agent was restarted
+   */
+  protected boolean wasAgentRestartedDuringOperation(Host host, Stage stage, 
String role) {
+    String hostName = host.getHostName();
+    long taskStartTime = stage.getHostRoleCommand(hostName, 
role).getStartTime();
+    return taskStartTime > 0 && taskStartTime <= 
host.getLastRegistrationTime();
+  }
+
+  /**
    * Checks if timeout is required.
    * @param status      the status of the current role
    * @param stage       the stage
@@ -843,7 +861,7 @@ class ActionScheduler implements Runnable {
    * @return {@code true} if timeout is needed
    * @throws AmbariException
    */
-  private boolean timeOutActionNeeded(HostRoleStatus status, Stage stage,
+  protected boolean timeOutActionNeeded(HostRoleStatus status, Stage stage,
       Host host, String role, long currentTime, long taskTimeout) throws
     AmbariException {
     if (( !status.equals(HostRoleStatus.QUEUED) ) &&
@@ -852,8 +870,9 @@ class ActionScheduler implements Runnable {
     }
 
     // Fast fail task if host state is unknown
-    if (null != host && host.getState().equals(HostState.HEARTBEAT_LOST)) {
-      LOG.debug("Timing out action since agent is not heartbeating.");
+    if (null != host &&
+      (host.getState().equals(HostState.HEARTBEAT_LOST) || 
wasAgentRestartedDuringOperation(host, stage, role))) {
+      LOG.debug("Timing out action since agent is not heartbeating or agent 
was restarted.");
       return true;
     }
 

http://git-wip-us.apache.org/repos/asf/ambari/blob/be25c9e7/ambari-server/src/test/java/org/apache/ambari/server/actionmanager/TestActionScheduler.java
----------------------------------------------------------------------
diff --git 
a/ambari-server/src/test/java/org/apache/ambari/server/actionmanager/TestActionScheduler.java
 
b/ambari-server/src/test/java/org/apache/ambari/server/actionmanager/TestActionScheduler.java
index af6fb9b..7a8890d 100644
--- 
a/ambari-server/src/test/java/org/apache/ambari/server/actionmanager/TestActionScheduler.java
+++ 
b/ambari-server/src/test/java/org/apache/ambari/server/actionmanager/TestActionScheduler.java
@@ -20,14 +20,12 @@ package org.apache.ambari.server.actionmanager;
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertFalse;
 import static org.junit.Assert.assertTrue;
-import static org.mockito.Matchers.any;
-import static org.mockito.Matchers.anyCollectionOf;
-import static org.mockito.Matchers.anyLong;
-import static org.mockito.Matchers.anyString;
-import static org.mockito.Matchers.eq;
+import static org.mockito.Matchers.*;
 import static org.mockito.Mockito.atLeastOnce;
 import static org.mockito.Mockito.doAnswer;
+import static org.mockito.Mockito.doReturn;
 import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.spy;
 import static org.mockito.Mockito.times;
 import static org.mockito.Mockito.verify;
 import static org.mockito.Mockito.when;
@@ -59,6 +57,7 @@ import org.apache.ambari.server.agent.CommandReport;
 import org.apache.ambari.server.agent.ExecutionCommand;
 import org.apache.ambari.server.configuration.Configuration;
 import org.apache.ambari.server.controller.HostsMap;
+import org.apache.ambari.server.events.AmbariEvent;
 import org.apache.ambari.server.events.publishers.AmbariEventPublisher;
 import org.apache.ambari.server.orm.GuiceJpaInitializer;
 import org.apache.ambari.server.orm.InMemoryDefaultTestModule;
@@ -75,6 +74,7 @@ import org.apache.ambari.server.state.Service;
 import org.apache.ambari.server.state.ServiceComponent;
 import org.apache.ambari.server.state.ServiceComponentHost;
 import org.apache.ambari.server.state.ServiceComponentHostEvent;
+import org.apache.ambari.server.state.fsm.InvalidStateTransitionException;
 import 
org.apache.ambari.server.state.svccomphost.ServiceComponentHostInstallEvent;
 import 
org.apache.ambari.server.state.svccomphost.ServiceComponentHostOpFailedEvent;
 import 
org.apache.ambari.server.state.svccomphost.ServiceComponentHostServerActionEvent;
@@ -299,7 +299,7 @@ public class TestActionScheduler {
         command.setStatus(HostRoleStatus.TIMEDOUT);
         return null;
       }
-    }).when(db).timeoutHostRole(anyString(), anyLong(), anyLong(), 
anyString());
+    }).when(db).timeoutHostRole(anyString(), anyLong(), anyLong(), 
anyString(), anyBoolean());
 
 
     //Small action timeout to test rescheduling
@@ -379,7 +379,7 @@ public class TestActionScheduler {
         command.setStatus(HostRoleStatus.TIMEDOUT);
         return null;
       }
-    }).when(db).timeoutHostRole(anyString(), anyLong(), anyLong(), 
anyString());
+    }).when(db).timeoutHostRole(anyString(), anyLong(), anyLong(), 
anyString(), anyBoolean());
 
     //Small action timeout to test rescheduling
     AmbariEventPublisher aep = 
EasyMock.createNiceMock(AmbariEventPublisher.class);
@@ -479,7 +479,7 @@ public class TestActionScheduler {
         }
         return null;
       }
-    }).when(db).timeoutHostRole(anyString(), anyLong(), anyLong(), 
anyString());
+    }).when(db).timeoutHostRole(anyString(), anyLong(), anyLong(), 
anyString(), anyBoolean());
 
     doAnswer(new Answer<Void>() {
       @Override
@@ -792,6 +792,109 @@ public class TestActionScheduler {
   }
 
   @Test
+  public void testTimeOutWithHostNull() throws AmbariException {
+    Stage s = getStageWithServerAction(1, 977, null, "test", 2);
+    s.setHostRoleStatus(null, Role.AMBARI_SERVER_ACTION.toString(), 
HostRoleStatus.IN_PROGRESS);
+
+    ActionScheduler scheduler = 
EasyMock.createMockBuilder(ActionScheduler.class)
+      .withConstructor(long.class, long.class, ActionDBAccessor.class, 
ActionQueue.class, Clusters.class, int.class,
+        HostsMap.class, UnitOfWork.class, AmbariEventPublisher.class, 
Configuration.class)
+      .withArgs(100L, 50L, null, null, null, -1, null, null, null, null)
+      .createNiceMock();
+
+    EasyMock.replay(scheduler);
+
+    // currentTime should be set to -1 and taskTimeout to 1 because it is 
needed for timeOutActionNeeded method will return false value
+    Assert.assertEquals(false, 
scheduler.timeOutActionNeeded(HostRoleStatus.IN_PROGRESS, s, null, 
Role.AMBARI_SERVER_ACTION.toString(), -1L, 1L));
+
+    EasyMock.verify(scheduler);
+  }
+
+  @Test
+  public void testTimeoutRequestDueAgentRestartExecuteCommand() throws 
Exception {
+    testTimeoutRequest(RoleCommand.EXECUTE);
+  }
+
+  @Test
+  public void testTimeoutRequestDueAgentRestartCustomCommand() throws 
Exception {
+    testTimeoutRequest(RoleCommand.CUSTOM_COMMAND);
+  }
+
+  @Test
+  public void testTimeoutRequestDueAgentRestartActionExecute() throws 
Exception {
+    testTimeoutRequest(RoleCommand.ACTIONEXECUTE);
+  }
+
+  @Test
+  public void testTimeoutRequestDueAgentRestartServiceCheck() throws Exception 
{
+    testTimeoutRequest(RoleCommand.SERVICE_CHECK);
+  }
+
+  private void testTimeoutRequest(RoleCommand roleCommand) throws 
AmbariException, InvalidStateTransitionException {
+    final long HOST_REGISTRATION_TIME = 100L;
+    final long STAGE_TASK_START_TIME = HOST_REGISTRATION_TIME - 1L;
+
+    ActionQueue aq = new ActionQueue();
+    Clusters fsm = EasyMock.createMock(Clusters.class);
+    Cluster cluster = EasyMock.createMock(Cluster.class);
+    Service service = EasyMock.createMock(Service.class);
+    ServiceComponent serviceComponent = 
EasyMock.createMock(ServiceComponent.class);
+    ServiceComponentHost serviceComponentHost = 
EasyMock.createMock(ServiceComponentHost.class);
+    Host host = EasyMock.createMock(Host.class);
+    ActionDBAccessor db = EasyMock.createMock(ActionDBAccessor.class);
+    AmbariEventPublisher ambariEventPublisher = 
EasyMock.createMock(AmbariEventPublisher.class);
+
+    
EasyMock.expect(fsm.getCluster(EasyMock.anyString())).andReturn(cluster).anyTimes();
+    EasyMock.expect(fsm.getHost(EasyMock.anyString())).andReturn(host);
+    EasyMock.expect(cluster.getService(EasyMock.anyString())).andReturn(null);
+    
EasyMock.expect(host.getLastRegistrationTime()).andReturn(HOST_REGISTRATION_TIME);
+    
EasyMock.expect(host.getHostName()).andReturn(Stage.INTERNAL_HOSTNAME).anyTimes();
+    EasyMock.expect(host.getState()).andReturn(HostState.HEALTHY);
+
+    if (RoleCommand.ACTIONEXECUTE.equals(roleCommand)) {
+      
EasyMock.expect(cluster.getClusterName()).andReturn("clusterName").anyTimes();
+      EasyMock.expect(cluster.getClusterId()).andReturn(1L);
+
+      ambariEventPublisher.publish(EasyMock.anyObject(AmbariEvent.class));
+      EasyMock.expectLastCall();
+    } else if (RoleCommand.EXECUTE.equals(roleCommand)) {
+      EasyMock.expect(cluster.getClusterName()).andReturn("clusterName");
+      
EasyMock.expect(cluster.getService(EasyMock.anyString())).andReturn(service);
+      
EasyMock.expect(service.getServiceComponent(EasyMock.anyString())).andReturn(serviceComponent);
+      
EasyMock.expect(serviceComponent.getServiceComponentHost(EasyMock.anyString())).andReturn(serviceComponentHost);
+
+      
serviceComponentHost.handleEvent(EasyMock.anyObject(ServiceComponentHostEvent.class));
+      EasyMock.expectLastCall();
+    }
+
+    Stage s = getStageWithServerAction(1, 977, null, "test", 2);
+    s.setStartTime(null, Role.AMBARI_SERVER_ACTION.toString(), 
STAGE_TASK_START_TIME);
+    s.setHostRoleStatus(null, Role.AMBARI_SERVER_ACTION.toString(), 
HostRoleStatus.IN_PROGRESS);
+    
s.getExecutionCommands(null).get(0).getExecutionCommand().setServiceName("Service
 name");
+    
s.getExecutionCommands(null).get(0).getExecutionCommand().setRoleCommand(roleCommand);
+
+    aq.enqueue(Stage.INTERNAL_HOSTNAME, 
s.getExecutionCommands(null).get(0).getExecutionCommand());
+    List<ExecutionCommand> commandsToSchedule = new 
ArrayList<ExecutionCommand>();
+
+    db.timeoutHostRole(EasyMock.anyString(), EasyMock.anyLong(), 
EasyMock.anyLong(), EasyMock.anyString(), EasyMock.anyBoolean());
+    EasyMock.expectLastCall();
+
+    ActionScheduler scheduler = 
EasyMock.createMockBuilder(ActionScheduler.class)
+      .withConstructor(long.class, long.class, ActionDBAccessor.class, 
ActionQueue.class, Clusters.class, int.class,
+        HostsMap.class, UnitOfWork.class, AmbariEventPublisher.class, 
Configuration.class)
+      .withArgs(100L, 50L, db, aq, fsm, -1, null, null, ambariEventPublisher, 
null)
+      .createNiceMock();
+
+    EasyMock.replay(scheduler, fsm, host, db, cluster, ambariEventPublisher, 
service, serviceComponent, serviceComponentHost);
+
+    scheduler.processInProgressStage(s, commandsToSchedule);
+
+    EasyMock.verify(scheduler, fsm, host, db, cluster, ambariEventPublisher, 
service, serviceComponent, serviceComponentHost);
+
+    Assert.assertTrue("ActionQueue should be empty after request was timeout", 
aq.size(Stage.INTERNAL_HOSTNAME) == 0);
+  }
+
+  @Test
   public void testServerActionFailed() throws Exception {
     ActionQueue aq = new ActionQueue();
     Properties properties = new Properties();
@@ -956,8 +1059,10 @@ public class TestActionScheduler {
 
     Properties properties = new Properties();
     Configuration conf = new Configuration(properties);
-    ActionScheduler scheduler = new ActionScheduler(100, 50, db, aq, fsm, 3,
-            new HostsMap((String) null), unitOfWork, null, conf);
+    ActionScheduler scheduler = spy(new ActionScheduler(100, 50, db, aq, fsm, 
3,
+            new HostsMap((String) null), unitOfWork, null, conf));
+
+    
doReturn(false).when(scheduler).wasAgentRestartedDuringOperation(any(Host.class),
 any(Stage.class), anyString());
 
     scheduler.doWork();
 
@@ -1044,9 +1149,12 @@ public class TestActionScheduler {
     Properties properties = new Properties();
     properties.put(Configuration.PARALLEL_STAGE_EXECUTION_KEY, "false");
     Configuration conf = new Configuration(properties);
-    ActionScheduler scheduler = new ActionScheduler(100, 50, db, aq, fsm, 3,
+    ActionScheduler scheduler = spy(new ActionScheduler(100, 50, db, aq, fsm, 
3,
             new HostsMap((String) null),
-            unitOfWork, null, conf);
+            unitOfWork, null, conf));
+
+
+    
doReturn(false).when(scheduler).wasAgentRestartedDuringOperation(any(Host.class),
 any(Stage.class), anyString());
 
     scheduler.doWork();
 
@@ -1115,9 +1223,11 @@ public class TestActionScheduler {
     Properties properties = new Properties();
     properties.put(Configuration.PARALLEL_STAGE_EXECUTION_KEY, "true");
     Configuration conf = new Configuration(properties);
-    ActionScheduler scheduler = new ActionScheduler(100, 50, db, aq, fsm, 3,
+    ActionScheduler scheduler = spy(new ActionScheduler(100, 50, db, aq, fsm, 
3,
         new HostsMap((String) null),
-        unitOfWork, null, conf);
+        unitOfWork, null, conf));
+
+    
doReturn(false).when(scheduler).wasAgentRestartedDuringOperation(any(Host.class),
 any(Stage.class), anyString());
 
     scheduler.doWork();
 
@@ -1637,6 +1747,7 @@ public class TestActionScheduler {
     Stage stage = stageFactory.createNew(requestId, "/tmp", clusterName, 1L, 
"getStageWithSingleTask",
       CLUSTER_HOST_INFO, "{\"host_param\":\"param_value\"}", 
"{\"stage_param\":\"param_value\"}");
     stage.setStageId(stageId);
+    //stage.setAutoSkipFailureSupported(true);
     return stage;
   }
 
@@ -2266,8 +2377,10 @@ public class TestActionScheduler {
     Properties properties = new Properties();
     Configuration conf = new Configuration(properties);
 
-    ActionScheduler scheduler = new ActionScheduler(100, 50, db, aq, fsm, 3,
-        new HostsMap((String) null), unitOfWork, null, conf);
+    ActionScheduler scheduler = spy(new ActionScheduler(100, 50, db, aq, fsm, 
3,
+        new HostsMap((String) null), unitOfWork, null, conf));
+
+    
doReturn(false).when(scheduler).wasAgentRestartedDuringOperation(any(Host.class),
 any(Stage.class), anyString());
 
     // Execution of request 1
 
@@ -2464,8 +2577,10 @@ public class TestActionScheduler {
       }
     }).when(db).abortOperation(anyLong());
 
-    ActionScheduler scheduler = new ActionScheduler(100, 50, db, aq, fsm, 3,
-        new HostsMap((String) null), unitOfWork, null, conf);
+    ActionScheduler scheduler = spy(new ActionScheduler(100, 50, db, aq, fsm, 
3,
+        new HostsMap((String) null), unitOfWork, null, conf));
+
+    
doReturn(false).when(scheduler).wasAgentRestartedDuringOperation(any(Host.class),
 any(Stage.class), anyString());
 
     scheduler.doWork();
 

Reply via email to