Repository: ambari
Updated Branches:
  refs/heads/branch-2.1 4a4900fc4 -> 22ff2a33d


AMBARI-13145 - RU - Skipping failed task caused remaining pending tasks to be 
ABORTED (jonathanhurley)


Project: http://git-wip-us.apache.org/repos/asf/ambari/repo
Commit: http://git-wip-us.apache.org/repos/asf/ambari/commit/22ff2a33
Tree: http://git-wip-us.apache.org/repos/asf/ambari/tree/22ff2a33
Diff: http://git-wip-us.apache.org/repos/asf/ambari/diff/22ff2a33

Branch: refs/heads/branch-2.1
Commit: 22ff2a33dccadd5cc06502d6c9c8fc6fee1701fc
Parents: 4a4900f
Author: Jonathan Hurley <jhur...@hortonworks.com>
Authored: Fri Sep 18 10:38:42 2015 -0400
Committer: Jonathan Hurley <jhur...@hortonworks.com>
Committed: Fri Sep 18 16:22:26 2015 -0400

----------------------------------------------------------------------
 .../server/actionmanager/ActionScheduler.java   |  81 ++++++-----
 .../actionmanager/TestActionScheduler.java      | 140 +++++++++++++++++++
 2 files changed, 186 insertions(+), 35 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/ambari/blob/22ff2a33/ambari-server/src/main/java/org/apache/ambari/server/actionmanager/ActionScheduler.java
----------------------------------------------------------------------
diff --git 
a/ambari-server/src/main/java/org/apache/ambari/server/actionmanager/ActionScheduler.java
 
b/ambari-server/src/main/java/org/apache/ambari/server/actionmanager/ActionScheduler.java
index 562a5ca..5833125 100644
--- 
a/ambari-server/src/main/java/org/apache/ambari/server/actionmanager/ActionScheduler.java
+++ 
b/ambari-server/src/main/java/org/apache/ambari/server/actionmanager/ActionScheduler.java
@@ -286,30 +286,35 @@ class ActionScheduler implements Runnable {
         // Commands that will be scheduled in current scheduler wakeup
         List<ExecutionCommand> commandsToSchedule = new 
ArrayList<ExecutionCommand>();
         Map<String, RoleStats> roleStats = processInProgressStage(stage, 
commandsToSchedule);
+
         // Check if stage is failed
         boolean failed = false;
-        for (Map.Entry<String, RoleStats>entry : roleStats.entrySet()) {
+        for (Map.Entry<String, RoleStats> entry : roleStats.entrySet()) {
 
-          String    role  = entry.getKey();
+          String role = entry.getKey();
           RoleStats stats = entry.getValue();
 
           if (LOG.isDebugEnabled()) {
-            LOG.debug("Stats for role:" + role + ", stats=" + stats);
+            LOG.debug("Stats for role: {}, stats={}", role, stats);
           }
-          if (stats.isRoleFailed()) {
+
+          // only fail the request if the role failed and the stage is not
+          // skippable
+          if (stats.isRoleFailed() && !stage.isSkippable()) {
+            LOG.warn("{} failed, request {} will be aborted", role, 
request.getRequestId());
+
             failed = true;
             break;
           }
         }
 
-        if(!failed) {
+        if (!failed) {
           // Prior stage may have failed and it may need to fail the whole 
request
           failed = hasPreviousStageFailed(stage);
         }
 
         if (failed) {
-          LOG.warn("Operation completely failed, aborting request id:"
-              + stage.getRequestId());
+          LOG.warn("Operation completely failed, aborting request id: {}", 
stage.getRequestId());
           cancelHostRoleCommands(stage.getOrderedHostRoleCommands(), 
FAILED_TASK_ABORT_REASONING);
           abortOperationsForStage(stage);
           return;
@@ -957,34 +962,37 @@ class ActionScheduler implements Runnable {
 
   private void updateRoleStats(HostRoleStatus status, RoleStats rs) {
     switch (status) {
-    case COMPLETED:
-      rs.numSucceeded++;
-      break;
-    case FAILED:
-      rs.numFailed++;
-      break;
-    case QUEUED:
-      rs.numQueued++;
-      break;
-    case PENDING:
-      rs.numPending++;
-      break;
-    case TIMEDOUT:
-      rs.numTimedOut++;
-      break;
-    case ABORTED:
-      rs.numAborted++;
-      break;
-    case IN_PROGRESS:
-      rs.numInProgress++;
-      break;
-    case HOLDING:
-    case HOLDING_FAILED:
-    case HOLDING_TIMEDOUT:
-      rs.numHolding++;
-      break;
-    default:
-      LOG.error("Unknown status " + status.name());
+      case COMPLETED:
+        rs.numSucceeded++;
+        break;
+      case FAILED:
+        rs.numFailed++;
+        break;
+      case QUEUED:
+        rs.numQueued++;
+        break;
+      case PENDING:
+        rs.numPending++;
+        break;
+      case TIMEDOUT:
+        rs.numTimedOut++;
+        break;
+      case ABORTED:
+        rs.numAborted++;
+        break;
+      case IN_PROGRESS:
+        rs.numInProgress++;
+        break;
+      case HOLDING:
+      case HOLDING_FAILED:
+      case HOLDING_TIMEDOUT:
+        rs.numHolding++;
+        break;
+      case SKIPPED_FAILED:
+        rs.numSkipped++;
+        break;
+      default:
+        LOG.error("Unknown status " + status.name());
     }
   }
 
@@ -1006,6 +1014,8 @@ class ActionScheduler implements Runnable {
     int numPending = 0;
     int numAborted = 0;
     int numHolding = 0;
+    int numSkipped = 0;
+
     final int totalHosts;
     final float successFactor;
 
@@ -1044,6 +1054,7 @@ class ActionScheduler implements Runnable {
       builder.append(", numTimedOut=").append(numTimedOut);
       builder.append(", numPending=").append(numPending);
       builder.append(", numAborted=").append(numAborted);
+      builder.append(", numSkipped=").append(numSkipped);
       builder.append(", totalHosts=").append(totalHosts);
       builder.append(", successFactor=").append(successFactor);
       return builder.toString();

http://git-wip-us.apache.org/repos/asf/ambari/blob/22ff2a33/ambari-server/src/test/java/org/apache/ambari/server/actionmanager/TestActionScheduler.java
----------------------------------------------------------------------
diff --git 
a/ambari-server/src/test/java/org/apache/ambari/server/actionmanager/TestActionScheduler.java
 
b/ambari-server/src/test/java/org/apache/ambari/server/actionmanager/TestActionScheduler.java
index 5650f17..73b1649 100644
--- 
a/ambari-server/src/test/java/org/apache/ambari/server/actionmanager/TestActionScheduler.java
+++ 
b/ambari-server/src/test/java/org/apache/ambari/server/actionmanager/TestActionScheduler.java
@@ -2329,6 +2329,146 @@ public class TestActionScheduler {
 
   }
 
+  /**
+   * Tests that command failures in skippable stages do not cause the request 
to
+   * be aborted.
+   */
+  @Test
+  public void testSkippableCommandFailureDoesNotAbortRequest() throws 
Exception {
+    Properties properties = new Properties();
+    Configuration conf = new Configuration(properties);
+    ActionQueue aq = new ActionQueue();
+    Clusters fsm = mock(Clusters.class);
+    Cluster oneClusterMock = mock(Cluster.class);
+    Service serviceObj = mock(Service.class);
+    ServiceComponent scomp = mock(ServiceComponent.class);
+    ServiceComponentHost sch = mock(ServiceComponentHost.class);
+    UnitOfWork unitOfWork = mock(UnitOfWork.class);
+    when(fsm.getCluster(anyString())).thenReturn(oneClusterMock);
+    when(oneClusterMock.getService(anyString())).thenReturn(serviceObj);
+    when(serviceObj.getServiceComponent(anyString())).thenReturn(scomp);
+    when(scomp.getServiceComponentHost(anyString())).thenReturn(sch);
+    when(serviceObj.getCluster()).thenReturn(oneClusterMock);
+
+    String hostname1 = "ahost.ambari.apache.org";
+
+    HashMap<String, ServiceComponentHost> hosts = new HashMap<String, 
ServiceComponentHost>();
+
+    hosts.put(hostname1, sch);
+
+    when(scomp.getServiceComponentHosts()).thenReturn(hosts);
+
+    // create 1 stage with 2 commands and then another stage with 1 command
+    Stage stage = null;
+    Stage stage2 = null;
+    final List<Stage> stages = new ArrayList<Stage>();
+    stages.add(stage = getStageWithSingleTask(hostname1, "cluster1", 
Role.NAMENODE,
+        RoleCommand.STOP, Service.Type.HDFS, 1, 1, 1));
+
+    addInstallTaskToStage(stage, hostname1, "cluster1", Role.HBASE_MASTER, 
RoleCommand.INSTALL,
+        Service.Type.HBASE, 1);
+
+    stages.add(stage2 = getStageWithSingleTask(hostname1, "cluster1", 
Role.DATANODE,
+        RoleCommand.STOP, Service.Type.HDFS, 1, 1, 1));
+
+    // !!! this is the test; make the stages skippable so that when their
+    // commands fail, the entire request is not aborted
+    for (Stage stageToMakeSkippable : stages) {
+      stageToMakeSkippable.setSkippable(true);
+    }
+
+    // fail the first task - normally this would cause an abort, exception 
that our stages
+    // are skippable now so it should not
+    HostRoleCommand command = 
stage.getOrderedHostRoleCommands().iterator().next();
+    command.setStatus(HostRoleStatus.FAILED);
+
+    ActionDBAccessor db = mock(ActionDBAccessor.class);
+
+    RequestEntity request = mock(RequestEntity.class);
+    when(request.isExclusive()).thenReturn(false);
+    when(db.getRequestEntity(anyLong())).thenReturn(request);
+
+    when(db.getCommandsInProgressCount()).thenReturn(stages.size());
+    when(db.getStagesInProgress()).thenReturn(stages);
+
+    doAnswer(new Answer<Void>() {
+      @Override
+      public Void answer(InvocationOnMock invocation) throws Throwable {
+        List<CommandReport> reports = (List<CommandReport>) 
invocation.getArguments()[0];
+        for (CommandReport report : reports) {
+          String actionId = report.getActionId();
+          long[] requestStageIds = StageUtils.getRequestStage(actionId);
+          Long requestId = requestStageIds[0];
+          Long stageId = requestStageIds[1];
+          Long id = report.getTaskId();
+          for (Stage stage : stages) {
+            if (requestId.equals(stage.getRequestId()) && 
stageId.equals(stage.getStageId())) {
+              for (HostRoleCommand hostRoleCommand : 
stage.getOrderedHostRoleCommands()) {
+                if (hostRoleCommand.getTaskId() == id) {
+                  
hostRoleCommand.setStatus(HostRoleStatus.valueOf(report.getStatus()));
+                }
+              }
+            }
+          }
+
+        }
+
+        return null;
+      }
+    }).when(db).updateHostRoleStates(anyCollectionOf(CommandReport.class));
+
+    when(db.getTask(anyLong())).thenAnswer(new Answer<Object>() {
+      @Override
+      public Object answer(InvocationOnMock invocation) throws Throwable {
+        Long taskId = (Long) invocation.getArguments()[0];
+        for (Stage stage : stages) {
+          for (HostRoleCommand command : stage.getOrderedHostRoleCommands()) {
+            if (taskId.equals(command.getTaskId())) {
+              return command;
+            }
+          }
+        }
+        return null;
+      }
+    });
+    doAnswer(new Answer<Void>() {
+      @Override
+      public Void answer(InvocationOnMock invocation) throws Throwable {
+        Long requestId = (Long) invocation.getArguments()[0];
+        for (Stage stage : stages) {
+          if (requestId.equals(stage.getRequestId())) {
+            for (HostRoleCommand command : stage.getOrderedHostRoleCommands()) 
{
+              if (command.getStatus() == HostRoleStatus.QUEUED
+                  || command.getStatus() == HostRoleStatus.IN_PROGRESS
+                  || command.getStatus() == HostRoleStatus.PENDING) {
+                command.setStatus(HostRoleStatus.ABORTED);
+              }
+            }
+          }
+        }
+
+        return null;
+      }
+    }).when(db).abortOperation(anyLong());
+
+    ActionScheduler scheduler = new ActionScheduler(100, 50, db, aq, fsm, 3,
+        new HostsMap((String) null), unitOfWork, null, conf);
+
+    scheduler.doWork();
+
+    Assert.assertEquals(HostRoleStatus.FAILED,
+        stages.get(0).getHostRoleStatus(hostname1, "NAMENODE"));
+
+    // the remaining tasks should NOT have been aborted since the stage is
+    // skippable - these tasks would normally be ABORTED if the stage was not
+    // skippable
+    Assert.assertEquals(HostRoleStatus.QUEUED,
+        stages.get(0).getHostRoleStatus(hostname1, "HBASE_MASTER"));
+
+    Assert.assertEquals(HostRoleStatus.PENDING,
+        stages.get(1).getHostRoleStatus(hostname1, "DATANODE"));
+
+  }
 
   public static class MockModule extends AbstractModule {
     @Override

Reply via email to