This is an automated email from the ASF dual-hosted git repository.

dsen pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/ambari.git


The following commit(s) were added to refs/heads/trunk by this push:
     new 4fde749  [AMBARI-24733] Rolling Restarts: Option to pause, resume and 
abort re… (#2424)
4fde749 is described below

commit 4fde749d3f8ff9ab37ec40a487ddc47cb9ab8ce3
Author: Dmitry Sen <d...@apache.org>
AuthorDate: Fri Oct 12 23:30:44 2018 +0300

    [AMBARI-24733] Rolling Restarts: Option to pause, resume and abort re… 
(#2424)
    
    * [AMBARI-24733] Rolling Restarts: Option to pause, resume and abort 
restarts (dsen) - initial
    
    * [AMBARI-24733] Rolling Restarts: Option to pause, resume and abort 
restarts (dsen)
    
    * [AMBARI-24733] Rolling Restarts: Option to pause, resume and abort 
restarts (dsen)
    
    * [AMBARI-24682] Rolling Restarts: Option to specify number of failures per 
batch (dsen) - added failed task count on resume
    
    * [AMBARI-24733] Rolling Restarts: Option to pause, resume and abort 
restarts (dsen) - changes according to review
---
 .../api/services/RequestScheduleService.java       |  34 ++++
 .../internal/RequestScheduleResourceProvider.java  |  24 ++-
 .../scheduler/AbstractLinearExecutionJob.java      |  17 ++
 .../server/scheduler/ExecutionScheduleManager.java | 190 ++++++++++++++++++++-
 .../server/state/scheduler/BatchRequest.java       |  10 ++
 .../server/state/scheduler/RequestExecution.java   |  10 +-
 .../state/scheduler/RequestExecutionImpl.java      |  39 +++++
 .../scheduler/ExecutionScheduleManagerTest.java    | 159 ++++++++++++++++-
 .../ambari/server/state/RequestExecutionTest.java  |   3 +-
 9 files changed, 468 insertions(+), 18 deletions(-)

diff --git 
a/ambari-server/src/main/java/org/apache/ambari/server/api/services/RequestScheduleService.java
 
b/ambari-server/src/main/java/org/apache/ambari/server/api/services/RequestScheduleService.java
index 8c08384..778d575 100644
--- 
a/ambari-server/src/main/java/org/apache/ambari/server/api/services/RequestScheduleService.java
+++ 
b/ambari-server/src/main/java/org/apache/ambari/server/api/services/RequestScheduleService.java
@@ -23,6 +23,7 @@ import java.util.Map;
 import javax.ws.rs.DELETE;
 import javax.ws.rs.GET;
 import javax.ws.rs.POST;
+import javax.ws.rs.PUT;
 import javax.ws.rs.Path;
 import javax.ws.rs.PathParam;
 import javax.ws.rs.Produces;
@@ -41,6 +42,7 @@ import io.swagger.annotations.Api;
 import io.swagger.annotations.ApiImplicitParam;
 import io.swagger.annotations.ApiImplicitParams;
 import io.swagger.annotations.ApiOperation;
+import io.swagger.annotations.ApiParam;
 import io.swagger.annotations.ApiResponse;
 import io.swagger.annotations.ApiResponses;
 
@@ -162,6 +164,38 @@ public class RequestScheduleService extends BaseService {
       createRequestSchedule(m_clusterName, null));
   }
 
+
+  /**
+   * Handles URL: /clusters/{clusterId}/request_schedules/{requestScheduleId}
+   * Get details on a specific request schedule
+   *
+   * @return
+   */
+
+  @PUT
+  @Path("{requestScheduleId}")
+  @Produces(MediaType.TEXT_PLAIN)
+  @ApiOperation(value = "Updates a scheduled request, usually used to pause 
running scheduled requests or to resume them.",
+    notes = "Changes the state of an existing request. Usually used to pause 
running scheduled requests or to resume them.",
+    nickname = "RequestSchedules#updateRequestSchedule"
+  )
+  @ApiImplicitParams({
+    @ApiImplicitParam(dataType = REQUEST_SCHEDULE_REQUEST_TYPE, paramType = 
PARAM_TYPE_BODY)
+  })
+  @ApiResponses({
+    @ApiResponse(code = HttpStatus.SC_OK, message = MSG_SUCCESSFUL_OPERATION),
+    @ApiResponse(code = HttpStatus.SC_ACCEPTED, message = 
MSG_REQUEST_ACCEPTED),
+    @ApiResponse(code = HttpStatus.SC_BAD_REQUEST, message = 
MSG_INVALID_ARGUMENTS),
+    @ApiResponse(code = HttpStatus.SC_NOT_FOUND, message = 
MSG_RESOURCE_NOT_FOUND),
+    @ApiResponse(code = HttpStatus.SC_UNAUTHORIZED, message = 
MSG_NOT_AUTHENTICATED),
+    @ApiResponse(code = HttpStatus.SC_FORBIDDEN, message = 
MSG_PERMISSION_DENIED),
+    @ApiResponse(code = HttpStatus.SC_INTERNAL_SERVER_ERROR, message = 
MSG_SERVER_ERROR),
+  })
+  public Response updateRequestSchedule(String body, @Context HttpHeaders 
headers, @Context UriInfo ui,
+                                 @ApiParam @PathParam("requestScheduleId") 
String requestScheduleId) {
+    return handleRequest(headers, body, ui, Request.Type.PUT, 
createRequestSchedule(m_clusterName, requestScheduleId));
+  }
+
   /**
    * Handles DELETE /clusters/{clusterId}/request_schedules/{requestScheduleId}
    * Delete a request schedule
diff --git 
a/ambari-server/src/main/java/org/apache/ambari/server/controller/internal/RequestScheduleResourceProvider.java
 
b/ambari-server/src/main/java/org/apache/ambari/server/controller/internal/RequestScheduleResourceProvider.java
index 5e8f849..bde31a5 100644
--- 
a/ambari-server/src/main/java/org/apache/ambari/server/controller/internal/RequestScheduleResourceProvider.java
+++ 
b/ambari-server/src/main/java/org/apache/ambari/server/controller/internal/RequestScheduleResourceProvider.java
@@ -430,11 +430,22 @@ public class RequestScheduleResourceProvider extends 
AbstractControllerResourceP
       String username = getManagementController().getAuthName();
       Integer userId = getManagementController().getAuthId();
 
-      requestExecution.setBatch(request.getBatch());
-      requestExecution.setDescription(request.getDescription());
-      requestExecution.setSchedule(request.getSchedule());
-      if (request.getStatus() != null && isValidRequestScheduleStatus
-          (request.getStatus())) {
+      if (request.getDescription() != null) {
+        requestExecution.setDescription(request.getDescription());
+      }
+
+      if (request.getSchedule() != null) {
+        requestExecution.setSchedule(request.getSchedule());
+      }
+
+      if (request.getStatus() != null) {
+        //TODO status changes graph
+        if (!isValidRequestScheduleStatus(request.getStatus())) {
+          throw new AmbariException("Request Schedule status not valid"
+            + ", clusterName = " + request.getClusterName()
+            + ", description = " + request.getDescription()
+            + ", id = " + request.getId());
+        }
         
requestExecution.setStatus(RequestExecution.Status.valueOf(request.getStatus()));
       }
       requestExecution.setUpdateUser(username);
@@ -443,6 +454,7 @@ public class RequestScheduleResourceProvider extends 
AbstractControllerResourceP
       LOG.info("Persisting updated Request Schedule "
         + ", clusterName = " + request.getClusterName()
         + ", description = " + request.getDescription()
+        + ", status = " + request.getStatus()
         + ", user = " + username);
 
       requestExecution.persist();
@@ -502,7 +514,7 @@ public class RequestScheduleResourceProvider extends 
AbstractControllerResourceP
 
       // Setup batch schedule
       getManagementController().getExecutionScheduleManager()
-        .scheduleBatch(requestExecution);
+        .scheduleAllBatches(requestExecution);
 
       RequestScheduleResponse response = new RequestScheduleResponse
         (requestExecution.getId(), requestExecution.getClusterName(),
diff --git 
a/ambari-server/src/main/java/org/apache/ambari/server/scheduler/AbstractLinearExecutionJob.java
 
b/ambari-server/src/main/java/org/apache/ambari/server/scheduler/AbstractLinearExecutionJob.java
index 4599dfa..d99e89e 100644
--- 
a/ambari-server/src/main/java/org/apache/ambari/server/scheduler/AbstractLinearExecutionJob.java
+++ 
b/ambari-server/src/main/java/org/apache/ambari/server/scheduler/AbstractLinearExecutionJob.java
@@ -17,6 +17,10 @@
  */
 package org.apache.ambari.server.scheduler;
 
+import static 
org.apache.ambari.server.state.scheduler.BatchRequestJob.BATCH_REQUEST_CLUSTER_NAME_KEY;
+import static 
org.apache.ambari.server.state.scheduler.BatchRequestJob.BATCH_REQUEST_EXECUTION_ID_KEY;
+import static 
org.apache.ambari.server.state.scheduler.RequestExecution.Status.ABORTED;
+import static 
org.apache.ambari.server.state.scheduler.RequestExecution.Status.PAUSED;
 import static org.quartz.DateBuilder.futureDate;
 import static org.quartz.SimpleScheduleBuilder.simpleSchedule;
 import static org.quartz.TriggerBuilder.newTrigger;
@@ -125,6 +129,19 @@ public abstract class AbstractLinearExecutionJob 
implements ExecutionJob {
       return;
     }
 
+    String status = null;
+    try {
+      status = 
executionScheduleManager.getBatchRequestStatus(jobDataMap.getLong(BATCH_REQUEST_EXECUTION_ID_KEY),
 jobDataMap.getString(BATCH_REQUEST_CLUSTER_NAME_KEY));
+    } catch (AmbariException e) {
+      LOG.warn("Unable to define the status of batch request : ", e);
+    }
+
+    if(ABORTED.name().equals(status) || PAUSED.name().equals(status)) {
+      LOG.info("The linear job chain was paused or aborted, not triggering the 
next one");
+      return;
+    }
+
+
     int separationSeconds = 
jobDataMap.getIntValue(NEXT_EXECUTION_SEPARATION_SECONDS);
     Object failedCount = 
properties.get(BatchRequestJob.BATCH_REQUEST_FAILED_TASKS_KEY);
     Object totalCount = 
properties.get(BatchRequestJob.BATCH_REQUEST_TOTAL_TASKS_KEY);
diff --git 
a/ambari-server/src/main/java/org/apache/ambari/server/scheduler/ExecutionScheduleManager.java
 
b/ambari-server/src/main/java/org/apache/ambari/server/scheduler/ExecutionScheduleManager.java
index 4f03a3e..3295395 100644
--- 
a/ambari-server/src/main/java/org/apache/ambari/server/scheduler/ExecutionScheduleManager.java
+++ 
b/ambari-server/src/main/java/org/apache/ambari/server/scheduler/ExecutionScheduleManager.java
@@ -18,6 +18,9 @@
 
 package org.apache.ambari.server.scheduler;
 
+import static 
org.apache.ambari.server.state.scheduler.RequestExecution.Status.ABORTED;
+import static 
org.apache.ambari.server.state.scheduler.RequestExecution.Status.PAUSED;
+import static 
org.apache.ambari.server.state.scheduler.RequestExecution.Status.SCHEDULED;
 import static org.quartz.CronScheduleBuilder.cronSchedule;
 import static org.quartz.JobBuilder.newJob;
 import static org.quartz.SimpleScheduleBuilder.simpleSchedule;
@@ -29,6 +32,7 @@ import java.security.SecureRandom;
 import java.security.cert.CertificateException;
 import java.security.cert.X509Certificate;
 import java.text.ParseException;
+import java.util.Collection;
 import java.util.Collections;
 import java.util.Date;
 import java.util.HashMap;
@@ -47,6 +51,14 @@ import org.apache.ambari.server.AmbariException;
 import org.apache.ambari.server.actionmanager.ActionDBAccessor;
 import org.apache.ambari.server.actionmanager.HostRoleStatus;
 import org.apache.ambari.server.configuration.Configuration;
+import 
org.apache.ambari.server.controller.internal.AbstractControllerResourceProvider;
+import org.apache.ambari.server.controller.internal.RequestImpl;
+import org.apache.ambari.server.controller.internal.RequestResourceProvider;
+import org.apache.ambari.server.controller.spi.Predicate;
+import org.apache.ambari.server.controller.spi.RequestStatus;
+import org.apache.ambari.server.controller.spi.Resource;
+import org.apache.ambari.server.controller.spi.ResourceProvider;
+import org.apache.ambari.server.controller.utilities.PredicateBuilder;
 import 
org.apache.ambari.server.security.authorization.internal.InternalTokenClientFilter;
 import 
org.apache.ambari.server.security.authorization.internal.InternalTokenStorage;
 import org.apache.ambari.server.state.Cluster;
@@ -275,13 +287,42 @@ public class ExecutionScheduleManager {
     return true;
   }
 
+
+  private long getFirstJobOrderId(RequestExecution requestExecution) throws 
AmbariException {
+    Long firstBatchOrderId = null;
+    Batch batch = requestExecution.getBatch();
+    if (batch != null) {
+      List<BatchRequest> batchRequests = batch.getBatchRequests();
+      if (batchRequests != null) {
+        Collections.sort(batchRequests);
+        ListIterator<BatchRequest> iterator = batchRequests.listIterator();
+        firstBatchOrderId = iterator.next().getOrderId();
+      }
+    }
+    if (firstBatchOrderId == null) {
+      throw new AmbariException("Can't schedule RequestExecution with no 
batches");
+    }
+    return firstBatchOrderId;
+  }
+
   /**
    * Persist jobs based on the request batch and create trigger for the first
    * job
    * @param requestExecution
    * @throws AmbariException
    */
-  public void scheduleBatch(RequestExecution requestExecution)
+  public void scheduleAllBatches(RequestExecution requestExecution) throws 
AmbariException {
+    Long firstBatchOrderId = getFirstJobOrderId(requestExecution);
+    scheduleBatch(requestExecution, firstBatchOrderId);
+  }
+
+  /**
+   * Persist jobs based on the request batches staring from the defined batch 
and create trigger for the first
+   * job
+   * @param requestExecution
+   * @throws AmbariException
+   */
+  public void scheduleBatch(RequestExecution requestExecution, long 
startingBatchOrderId)
     throws AmbariException {
 
     if (!isSchedulerAvailable()) {
@@ -297,15 +338,18 @@ public class ExecutionScheduleManager {
       LOG.error("Unable to determine scheduler state.", e);
       throw new AmbariException("Scheduler unavailable.");
     }
+    LOG.debug("Scheduling jobs starting from " + startingBatchOrderId);
 
     // Create and persist jobs based on batches
-    JobDetail firstJobDetail = persistBatch(requestExecution);
+    JobDetail firstJobDetail = persistBatch(requestExecution, 
startingBatchOrderId);
 
     if (firstJobDetail == null) {
       throw new AmbariException("Unable to schedule jobs. firstJobDetail = "
         + firstJobDetail);
     }
 
+    Integer failedCount = 
countFailedTasksBeforeStartingBatch(requestExecution, startingBatchOrderId);
+
     // Create a cron trigger for the first batch job
     // If no schedule is specified create simple trigger to fire right away
     Schedule schedule = requestExecution.getSchedule();
@@ -332,6 +376,7 @@ public class ExecutionScheduleManager {
           .withSchedule(cronSchedule(triggerExpression)
             .withMisfireHandlingInstructionFireAndProceed())
           .forJob(firstJobDetail)
+          .usingJobData(BatchRequestJob.BATCH_REQUEST_FAILED_TASKS_KEY, 
failedCount)
           .startAt(startDate)
           .endAt(endDate)
           .build();
@@ -351,6 +396,7 @@ public class ExecutionScheduleManager {
         .withIdentity(REQUEST_EXECUTION_TRIGGER_PREFIX + "-" +
           requestExecution.getId(), 
ExecutionJob.LINEAR_EXECUTION_TRIGGER_GROUP)
         .withSchedule(simpleSchedule().withMisfireHandlingInstructionFireNow())
+        .usingJobData(BatchRequestJob.BATCH_REQUEST_FAILED_TASKS_KEY, 
failedCount)
         .startNow()
         .build();
 
@@ -364,7 +410,35 @@ public class ExecutionScheduleManager {
     }
   }
 
-  private JobDetail persistBatch(RequestExecution requestExecution)
+  private Integer countFailedTasksBeforeStartingBatch(RequestExecution 
requestExecution, long startingBatchOrderId) throws AmbariException {
+    int result = 0;
+    Batch batch = requestExecution.getBatch();
+    if (batch != null) {
+      List<BatchRequest> batchRequests = batch.getBatchRequests();
+      if (batchRequests != null) {
+        Collections.sort(batchRequests);
+        for (BatchRequest batchRequest : batchRequests) {
+          if (batchRequest.getOrderId() >= startingBatchOrderId) break;
+
+          if (batchRequest.getRequestId() != null) {
+            BatchRequestResponse batchRequestResponse = 
getBatchRequestResponse(batchRequest.getRequestId(), 
requestExecution.getClusterName());
+            if (batchRequestResponse != null) {
+               result += batchRequestResponse.getFailedTaskCount() +
+                         batchRequestResponse.getAbortedTaskCount() +
+                         batchRequestResponse.getTimedOutTaskCount();
+            }
+          }
+        }
+      }
+    }
+    return result;
+  }
+
+  /**
+   * Creates and stores the chain of BatchRequestJobs - quartz jobs - in order 
from the last order Id to the startingBatchOrderId
+   * @return the quartz job that corresponds to startingBatchOrderId
+   */
+  private JobDetail persistBatch(RequestExecution requestExecution, long 
startingBatchOrderId)
     throws  AmbariException {
 
     Batch batch = requestExecution.getBatch();
@@ -376,7 +450,8 @@ public class ExecutionScheduleManager {
         Collections.sort(batchRequests);
         ListIterator<BatchRequest> iterator = 
batchRequests.listIterator(batchRequests.size());
         String nextJobName = null;
-        while (iterator.hasPrevious()) {
+        long nextBatchOrderId = batchRequests.size();
+        while (nextBatchOrderId != startingBatchOrderId && 
iterator.hasPrevious()) {
           BatchRequest batchRequest = iterator.previous();
 
           String jobName = getJobName(requestExecution.getId(),
@@ -409,6 +484,7 @@ public class ExecutionScheduleManager {
           }
 
           nextJobName = jobName;
+          nextBatchOrderId = batchRequest.getOrderId();
         }
       }
     }
@@ -421,14 +497,60 @@ public class ExecutionScheduleManager {
   }
 
   /**
-   * Delete and re-create all jobs and triggers
-   * Update schedule for a batch
+   * Pause/resume/abort request schedule and related jobs and triggers
    * @param requestExecution
    */
   public void updateBatchSchedule(RequestExecution requestExecution)
     throws AmbariException {
+    BatchRequest activeBatch = calculateActiveBatch(requestExecution);
+    if (activeBatch == null) {
+      LOG.warn("Ignoring RequestExecution status update since all batches has 
been executed");
+      return;
+    }
+    if (requestExecution.getStatus().equals(SCHEDULED.name())) {
+      scheduleBatch(requestExecution, activeBatch.getOrderId());
+    } else if (requestExecution.getStatus().equals(PAUSED.name()) ||
+               requestExecution.getStatus().equals(ABORTED.name())) {
+      LOG.info("Request execution status changed to " + 
requestExecution.getStatus() + " for request schedule "
+        + requestExecution.getId() + ". Deleting related jobs.");
+      deleteJobs(requestExecution, activeBatch.getOrderId());
+      Collection<Long> requestIDsToAbort = 
requestExecution.getBatchRequestRequestsIDs(activeBatch.getOrderId());
+      for (Long requestId : requestIDsToAbort) {
+        //might be null if the request is for not long running job
+        if (requestId == null) continue;
+        abortRequestById(requestExecution, requestId);
+      }
+    }
+  }
+
+  /**
+   * Iterate through the batches and find the first one with not completed 
status, if all were completed return null
+   * @param requestExecution
+   * @return
+   */
+  private BatchRequest calculateActiveBatch(RequestExecution requestExecution) 
{
+    BatchRequest result = null;
+    Batch batch = requestExecution.getBatch();
+    if (batch != null) {
+      List<BatchRequest> batchRequests = batch.getBatchRequests();
+      if (batchRequests != null) {
+        Collections.sort(batchRequests);
+        ListIterator<BatchRequest> iterator = batchRequests.listIterator();
+        do {
+          result = iterator.next();
+        } while (iterator.hasNext() &&
+                 
HostRoleStatus.getCompletedStates().contains(HostRoleStatus.valueOf(result.getStatus()))
 &&
+                 !HostRoleStatus.ABORTED.name().equals(result.getStatus()));
+      }
+    }
+
+    if (result != null &&
+      
HostRoleStatus.getCompletedStates().contains(HostRoleStatus.valueOf(result.getStatus()))
 &&
+      !HostRoleStatus.ABORTED.name().equals(result.getStatus())) {
+      return null;
+    }
 
-    // TODO: Support delete and update if no jobs are running
+    return result;
   }
 
   /**
@@ -483,6 +605,15 @@ public class ExecutionScheduleManager {
    * @throws AmbariException
    */
   public void deleteAllJobs(RequestExecution requestExecution) throws 
AmbariException {
+    Long firstBatchOrderId = getFirstJobOrderId(requestExecution);
+    deleteJobs(requestExecution, firstBatchOrderId);
+  }
+
+  /**
+   * Delete all jobs and triggers if possible.
+   * @throws AmbariException
+   */
+  public void deleteJobs(RequestExecution requestExecution, Long 
startingBatchOrderId) throws AmbariException {
     if (!isSchedulerAvailable()) {
       throw new AmbariException("Scheduler unavailable.");
     }
@@ -492,7 +623,11 @@ public class ExecutionScheduleManager {
     if (batch != null) {
       List<BatchRequest> batchRequests = batch.getBatchRequests();
       if (batchRequests != null) {
+        Collections.sort(batchRequests);
         for (BatchRequest batchRequest : batchRequests) {
+          //skip all before starting batch
+          if (batchRequest.getOrderId() < startingBatchOrderId) continue;
+
           String jobName = getJobName(requestExecution.getId(),
             batchRequest.getOrderId());
 
@@ -540,6 +675,8 @@ public class ExecutionScheduleManager {
         
actionDBAccessor.setSourceScheduleForRequest(batchRequestResponse.getRequestId(),
 executionId);
       }
 
+      batchRequest.setRequestId(batchRequestResponse.getRequestId());
+
       return batchRequestResponse.getRequestId();
     } catch (Exception e) {
       throw new AmbariException("Exception occurred while performing request", 
e);
@@ -566,6 +703,33 @@ public class ExecutionScheduleManager {
 
   }
 
+  protected RequestStatus abortRequestById(RequestExecution requestExecution, 
Long requestId) throws AmbariException {
+    LOG.debug("Aborting request " + requestId);
+    ResourceProvider provider =
+      
AbstractControllerResourceProvider.getResourceProvider(Resource.Type.Request);
+
+    Map<String, Object> properties = new HashMap<>();
+    properties.put(RequestResourceProvider.REQUEST_CLUSTER_NAME_PROPERTY_ID, 
requestExecution.getClusterName());
+    properties.put(RequestResourceProvider.REQUEST_ABORT_REASON_PROPERTY_ID, 
"Request execution status changed to " + requestExecution.getStatus());
+    properties.put(RequestResourceProvider.REQUEST_ID_PROPERTY_ID, 
Long.toString(requestId));
+    properties.put(RequestResourceProvider.REQUEST_STATUS_PROPERTY_ID, 
HostRoleStatus.ABORTED.name());
+
+    org.apache.ambari.server.controller.spi.Request request = new 
RequestImpl(Collections.emptySet(),
+        Collections.singleton(properties), Collections.emptyMap(), null);
+
+    Predicate predicate =  new PredicateBuilder()
+        .property(RequestResourceProvider.REQUEST_CLUSTER_NAME_PROPERTY_ID)
+        .equals(requestExecution.getClusterName()).and()
+        .property(RequestResourceProvider.REQUEST_ID_PROPERTY_ID)
+        .equals(Long.toString(requestId)).toPredicate();
+
+    try{
+      return provider.updateResources(request, predicate);
+    } catch (Exception e) {
+     throw new AmbariException("Error while aborting the request.", e);
+    }
+  }
+
   private BatchRequestResponse convertToBatchRequestResponse(ClientResponse 
clientResponse) {
     BatchRequestResponse batchRequestResponse = new BatchRequestResponse();
     int retCode = clientResponse.getStatus();
@@ -638,6 +802,18 @@ public class ExecutionScheduleManager {
     return batchRequestResponse;
   }
 
+  public String getBatchRequestStatus(Long executionId, String clusterName) 
throws AmbariException {
+    Cluster cluster = clusters.getCluster(clusterName);
+    RequestExecution requestExecution = 
cluster.getAllRequestExecutions().get(executionId);
+
+    if (requestExecution == null) {
+      throw new AmbariException("Unable to find request schedule with id = "
+          + executionId);
+    }
+
+    return requestExecution.getStatus();
+  }
+
   public void updateBatchRequest(long executionId, long batchId, String 
clusterName,
                                  BatchRequestResponse batchRequestResponse,
                                  boolean statusOnly) throws AmbariException {
diff --git 
a/ambari-server/src/main/java/org/apache/ambari/server/state/scheduler/BatchRequest.java
 
b/ambari-server/src/main/java/org/apache/ambari/server/state/scheduler/BatchRequest.java
index 3063d29..a6d194f 100644
--- 
a/ambari-server/src/main/java/org/apache/ambari/server/state/scheduler/BatchRequest.java
+++ 
b/ambari-server/src/main/java/org/apache/ambari/server/state/scheduler/BatchRequest.java
@@ -22,6 +22,7 @@ import org.codehaus.jackson.map.annotate.JsonSerialize;
 
 public class BatchRequest implements Comparable<BatchRequest> {
   private Long orderId;
+  private Long requestId;
   private Type type;
   private String uri;
   private String body;
@@ -38,6 +39,15 @@ public class BatchRequest implements 
Comparable<BatchRequest> {
     this.orderId = orderId;
   }
 
+  @JsonProperty("request_id")
+  public Long getRequestId() {
+    return requestId;
+  }
+
+  public void setRequestId(Long requestId) {
+    this.requestId = requestId;
+  }
+
   @JsonProperty("request_type")
   public String getType() {
     return type.name();
diff --git 
a/ambari-server/src/main/java/org/apache/ambari/server/state/scheduler/RequestExecution.java
 
b/ambari-server/src/main/java/org/apache/ambari/server/state/scheduler/RequestExecution.java
index 6d9c40b..f91b6f5 100644
--- 
a/ambari-server/src/main/java/org/apache/ambari/server/state/scheduler/RequestExecution.java
+++ 
b/ambari-server/src/main/java/org/apache/ambari/server/state/scheduler/RequestExecution.java
@@ -17,6 +17,8 @@
  */
 package org.apache.ambari.server.state.scheduler;
 
+import java.util.Collection;
+
 import org.apache.ambari.server.controller.RequestScheduleResponse;
 
 /**
@@ -161,6 +163,10 @@ public interface RequestExecution {
   String getRequestBody(Long batchId);
 
   /**
+   * Get the requests IDs for the batch
+   */
+  Collection<Long> getBatchRequestRequestsIDs(long batchId);
+  /**
    * Get batch request with specified order id
    */
   BatchRequest getBatchRequest(long batchId);
@@ -184,6 +190,8 @@ public interface RequestExecution {
   enum Status {
     SCHEDULED,
     COMPLETED,
-    DISABLED
+    DISABLED,
+    ABORTED,
+    PAUSED
   }
 }
diff --git 
a/ambari-server/src/main/java/org/apache/ambari/server/state/scheduler/RequestExecutionImpl.java
 
b/ambari-server/src/main/java/org/apache/ambari/server/state/scheduler/RequestExecutionImpl.java
index 104ca9b..fe18b9a 100644
--- 
a/ambari-server/src/main/java/org/apache/ambari/server/state/scheduler/RequestExecutionImpl.java
+++ 
b/ambari-server/src/main/java/org/apache/ambari/server/state/scheduler/RequestExecutionImpl.java
@@ -17,14 +17,17 @@
  */
 package org.apache.ambari.server.state.scheduler;
 
+import java.util.ArrayList;
 import java.util.Collection;
 import java.util.Collections;
 import java.util.List;
 import java.util.concurrent.locks.ReadWriteLock;
 import java.util.concurrent.locks.ReentrantReadWriteLock;
+import java.util.stream.Collectors;
 
 import javax.annotation.Nullable;
 
+import org.apache.ambari.server.actionmanager.HostRoleStatus;
 import org.apache.ambari.server.controller.RequestScheduleResponse;
 import org.apache.ambari.server.orm.dao.ClusterDAO;
 import org.apache.ambari.server.orm.dao.HostDAO;
@@ -115,6 +118,10 @@ public class RequestExecutionImpl implements 
RequestExecution {
           batchRequestEntities) {
         BatchRequest batchRequest = new BatchRequest();
         batchRequest.setOrderId(batchRequestEntity.getBatchId());
+        batchRequest.setRequestId(batchRequestEntity.getRequestId());
+        if (batchRequestEntity.getRequestBody() != null) {
+          batchRequest.setBody(new 
String(batchRequestEntity.getRequestBody()));
+        }
         
batchRequest.setType(BatchRequest.Type.valueOf(batchRequestEntity.getRequestType()));
         batchRequest.setUri(batchRequestEntity.getRequestUri());
         batchRequest.setStatus(batchRequestEntity.getRequestStatus());
@@ -133,6 +140,13 @@ public class RequestExecutionImpl implements 
RequestExecution {
     schedule.setStartTime(requestScheduleEntity.getStartTime());
     schedule.setEndTime(requestScheduleEntity.getEndTime());
 
+    //if all values are nulls set the general scheduler to null
+    if (schedule.getDayOfWeek() == null && schedule.getDaysOfMonth() == null &&
+          schedule.getMinutes() == null && schedule.getHours() == null &&
+          schedule.getMonth() == null && schedule.getYear() == null &&
+          schedule.getStartTime() == null && schedule.getEndTime() == null) {
+      schedule = null;
+    }
     isPersisted = true;
   }
 
@@ -276,6 +290,7 @@ public class RequestExecutionImpl implements 
RequestExecution {
           RequestScheduleBatchRequestEntity batchRequestEntity = new
             RequestScheduleBatchRequestEntity();
           batchRequestEntity.setBatchId(batchRequest.getOrderId());
+          batchRequestEntity.setRequestId(batchRequest.getRequestId());
           
batchRequestEntity.setScheduleId(requestScheduleEntity.getScheduleId());
           batchRequestEntity.setRequestScheduleEntity(requestScheduleEntity);
           batchRequestEntity.setRequestType(batchRequest.getType());
@@ -428,6 +443,22 @@ public class RequestExecutionImpl implements 
RequestExecution {
   }
 
   @Override
+  public Collection<Long> getBatchRequestRequestsIDs(long batchId) {
+    Collection<Long> requestIDs = new ArrayList<>();
+    if (requestScheduleEntity != null) {
+      Collection<RequestScheduleBatchRequestEntity> requestEntities =
+        requestScheduleEntity.getRequestScheduleBatchRequestEntities();
+      if (requestEntities != null) {
+        requestIDs.addAll(requestEntities.stream()
+          .filter(requestEntity -> requestEntity.getBatchId().equals(batchId))
+          .map(RequestScheduleBatchRequestEntity::getRequestId)
+          .collect(Collectors.toList()));
+      }
+    }
+    return requestIDs;
+  }
+
+  @Override
   public BatchRequest getBatchRequest(long batchId) {
     for (BatchRequest batchRequest : batch.getBatchRequests()) {
       if (batchId == batchRequest.getOrderId()) {
@@ -452,6 +483,14 @@ public class RequestExecutionImpl implements 
RequestExecution {
       }
     }
 
+    // Rare race condition when batch request finished during pausing the 
request execution,
+    //in this case the job details will be deleted,
+    //so we mark it as not completed because otherwise the job detail will be 
lost
+    //and the whole Request Execution status will not be set to COMPLETED at 
the end.
+    if (Status.PAUSED.name().equals(getStatus()) && 
HostRoleStatus.COMPLETED.name().equals(batchRequestResponse.getStatus())) {
+      batchRequestResponse.setStatus(HostRoleStatus.ABORTED.name());
+    }
+
     if (batchRequestEntity != null) {
       batchRequestEntity.setRequestStatus(batchRequestResponse.getStatus());
 
diff --git 
a/ambari-server/src/test/java/org/apache/ambari/server/scheduler/ExecutionScheduleManagerTest.java
 
b/ambari-server/src/test/java/org/apache/ambari/server/scheduler/ExecutionScheduleManagerTest.java
index 3f4a5d5..3097279 100644
--- 
a/ambari-server/src/test/java/org/apache/ambari/server/scheduler/ExecutionScheduleManagerTest.java
+++ 
b/ambari-server/src/test/java/org/apache/ambari/server/scheduler/ExecutionScheduleManagerTest.java
@@ -33,6 +33,7 @@ import static org.junit.Assert.assertThat;
 
 import java.lang.reflect.InvocationTargetException;
 import java.util.ArrayList;
+import java.util.Collections;
 import java.util.Date;
 import java.util.HashMap;
 import java.util.List;
@@ -216,7 +217,7 @@ public class ExecutionScheduleManagerTest {
     RequestExecution requestExecution = createRequestExecution(true);
     Assert.assertNotNull(requestExecution);
 
-    executionScheduleManager.scheduleBatch(requestExecution);
+    executionScheduleManager.scheduleAllBatches(requestExecution);
 
     String jobName1 = executionScheduleManager.getJobName(requestExecution
       .getId(), 10L);
@@ -280,7 +281,7 @@ public class ExecutionScheduleManagerTest {
     RequestExecution requestExecution = createRequestExecution(true);
     Assert.assertNotNull(requestExecution);
 
-    executionScheduleManager.scheduleBatch(requestExecution);
+    executionScheduleManager.scheduleAllBatches(requestExecution);
 
     String jobName1 = executionScheduleManager.getJobName(requestExecution
       .getId(), 10L);
@@ -308,7 +309,7 @@ public class ExecutionScheduleManagerTest {
     RequestExecution requestExecution = createRequestExecution(false);
     Assert.assertNotNull(requestExecution);
 
-    executionScheduleManager.scheduleBatch(requestExecution);
+    executionScheduleManager.scheduleAllBatches(requestExecution);
 
     String jobName1 = executionScheduleManager.getJobName(requestExecution
       .getId(), 10L);
@@ -387,6 +388,9 @@ public class ExecutionScheduleManagerTest {
     expect(batchRequestMock.getUri()).andReturn(uri).once();
     expect(batchRequestMock.getType()).andReturn(type).once();
 
+    batchRequestMock.setRequestId(5L);
+    expectLastCall().once();
+
     expect(scheduleManager.performApiRequest(eq(uri), eq(body), eq(type), 
eq(userId))).andReturn(batchRequestResponse).once();
 
     scheduleManager.updateBatchRequest(eq(executionId), eq(batchId), 
eq(clusterName), eq(batchRequestResponse), eq(false));
@@ -721,4 +725,153 @@ public class ExecutionScheduleManagerTest {
     assertEquals("http://localhost:8080/";,
       scheduleManager.extendApiResource(webResource, "").getURI().toString());
   }
+
+  @Test
+  public void testUpdateBatchSchedulePause() throws Exception {
+    Clusters clustersMock = createMock(Clusters.class);
+    Cluster clusterMock = createMock(Cluster.class);
+    Configuration configurationMock = createNiceMock(Configuration.class);
+    ExecutionScheduler executionSchedulerMock = 
createMock(ExecutionScheduler.class);
+    InternalTokenStorage tokenStorageMock = 
createMock(InternalTokenStorage.class);
+    ActionDBAccessor actionDBAccessorMock = createMock(ActionDBAccessor.class);
+    Gson gson = new Gson();
+    RequestExecution requestExecutionMock = createMock(RequestExecution.class);
+    Batch batchMock = createMock(Batch.class);
+    JobDetail jobDetailMock = createMock(JobDetail.class);
+    final BatchRequest batchRequestMock1 = createMock(BatchRequest.class);
+    final BatchRequest batchRequestMock2 = createMock(BatchRequest.class);
+    final Trigger triggerMock = createNiceMock(Trigger.class);
+    final List<Trigger> triggers = new ArrayList<Trigger>()  {{ 
add(triggerMock); }};
+
+    long executionId = 11L;
+    String clusterName = "c1";
+    Date pastDate = new Date(new Date().getTime() - 2);
+
+    Map<Long, RequestExecution> executionMap = new HashMap<>();
+    executionMap.put(executionId, requestExecutionMock);
+
+    
EasyMock.expect(configurationMock.getApiSSLAuthentication()).andReturn(Boolean.FALSE);
+    EasyMock.replay(configurationMock);
+
+    ExecutionScheduleManager scheduleManager =
+      createMockBuilder(ExecutionScheduleManager.class)
+        .withConstructor(configurationMock, executionSchedulerMock, 
tokenStorageMock,
+          clustersMock, actionDBAccessorMock, gson)
+        .addMockedMethods("deleteJobs", "abortRequestById").createMock();
+
+    
expect(clustersMock.getCluster(clusterName)).andReturn(clusterMock).anyTimes();
+    
expect(clusterMock.getAllRequestExecutions()).andReturn(executionMap).anyTimes();
+    expect(requestExecutionMock.getBatch()).andReturn(batchMock).anyTimes();
+    expect(batchMock.getBatchRequests()).andReturn
+      (new ArrayList<BatchRequest>() {{
+        add(batchRequestMock1);
+        add(batchRequestMock2);
+      }});
+    expect(batchRequestMock1.getOrderId()).andReturn(1L).anyTimes();
+    
expect(batchRequestMock1.getStatus()).andReturn(HostRoleStatus.COMPLETED.name()).anyTimes();
+    
expect(batchRequestMock1.compareTo(batchRequestMock2)).andReturn(-1).anyTimes();
+    
expect(batchRequestMock2.compareTo(batchRequestMock1)).andReturn(1).anyTimes();
+    expect(batchRequestMock2.getOrderId()).andReturn(3L).anyTimes();
+    
expect(batchRequestMock2.getStatus()).andReturn(HostRoleStatus.IN_PROGRESS.name()).anyTimes();
+    expect(executionSchedulerMock.getJobDetail((JobKey) anyObject()))
+      .andReturn(jobDetailMock).anyTimes();
+    expect((List<Trigger>) executionSchedulerMock
+      .getTriggersForJob((JobKey) anyObject())).andReturn(triggers).anyTimes();
+    expect(triggerMock.mayFireAgain()).andReturn(true).anyTimes();
+    expect(triggerMock.getFinalFireTime()).andReturn(pastDate).anyTimes();
+
+    
expect(requestExecutionMock.getStatus()).andReturn(RequestExecution.Status.PAUSED.name()).anyTimes();
+    expect(requestExecutionMock.getId()).andReturn(executionId).anyTimes();
+    
expect(requestExecutionMock.getBatchRequestRequestsIDs(3L)).andReturn(Collections.singleton(5L)).anyTimes();
+
+    //deletes only second batch, the first was completed
+    scheduleManager.deleteJobs(eq(requestExecutionMock), eq(3L));
+    expectLastCall().once();
+    //second batch request needs to be aborted
+    expect(scheduleManager.abortRequestById(requestExecutionMock, 
5L)).andReturn(null).once();
+
+    replay(clustersMock, clusterMock, requestExecutionMock,
+      executionSchedulerMock, scheduleManager, batchMock, batchRequestMock1, 
batchRequestMock2,
+      triggerMock, jobDetailMock, actionDBAccessorMock);
+
+    scheduleManager.updateBatchSchedule(requestExecutionMock);
+
+    verify(clustersMock, clusterMock, configurationMock, requestExecutionMock,
+      executionSchedulerMock, scheduleManager, batchMock, batchRequestMock1, 
batchRequestMock2,
+      triggerMock, jobDetailMock, actionDBAccessorMock);
+  }
+
+  @Test
+  public void testUpdateBatchScheduleUnpause() throws Exception {
+    Clusters clustersMock = createMock(Clusters.class);
+    Cluster clusterMock = createMock(Cluster.class);
+    Configuration configurationMock = createNiceMock(Configuration.class);
+    ExecutionScheduler executionSchedulerMock = 
createMock(ExecutionScheduler.class);
+    InternalTokenStorage tokenStorageMock = 
createMock(InternalTokenStorage.class);
+    ActionDBAccessor actionDBAccessorMock = createMock(ActionDBAccessor.class);
+    Gson gson = new Gson();
+    RequestExecution requestExecutionMock = createMock(RequestExecution.class);
+    Batch batchMock = createMock(Batch.class);
+    JobDetail jobDetailMock = createMock(JobDetail.class);
+    final BatchRequest batchRequestMock1 = createMock(BatchRequest.class);
+    final BatchRequest batchRequestMock2 = createMock(BatchRequest.class);
+    final Trigger triggerMock = createNiceMock(Trigger.class);
+    final List<Trigger> triggers = new ArrayList<Trigger>()  {{ 
add(triggerMock); }};
+
+    long executionId = 11L;
+    String clusterName = "c1";
+    Date pastDate = new Date(new Date().getTime() - 2);
+
+    Map<Long, RequestExecution> executionMap = new HashMap<>();
+    executionMap.put(executionId, requestExecutionMock);
+
+    
EasyMock.expect(configurationMock.getApiSSLAuthentication()).andReturn(Boolean.FALSE);
+    EasyMock.replay(configurationMock);
+
+    ExecutionScheduleManager scheduleManager =
+      createMockBuilder(ExecutionScheduleManager.class)
+        .withConstructor(configurationMock, executionSchedulerMock, 
tokenStorageMock,
+          clustersMock, actionDBAccessorMock, gson)
+        .addMockedMethods("scheduleBatch").createMock();
+
+    
expect(clustersMock.getCluster(clusterName)).andReturn(clusterMock).anyTimes();
+    
expect(clusterMock.getAllRequestExecutions()).andReturn(executionMap).anyTimes();
+    expect(requestExecutionMock.getBatch()).andReturn(batchMock).anyTimes();
+    expect(batchMock.getBatchRequests()).andReturn
+      (new ArrayList<BatchRequest>() {{
+        add(batchRequestMock1);
+        add(batchRequestMock2);
+      }});
+    expect(batchRequestMock1.getOrderId()).andReturn(1L).anyTimes();
+    
expect(batchRequestMock1.getStatus()).andReturn(HostRoleStatus.FAILED.name()).anyTimes();
+    
expect(batchRequestMock1.compareTo(batchRequestMock2)).andReturn(-1).anyTimes();
+    
expect(batchRequestMock2.compareTo(batchRequestMock1)).andReturn(1).anyTimes();
+    expect(batchRequestMock2.getOrderId()).andReturn(3L).anyTimes();
+    
expect(batchRequestMock2.getStatus()).andReturn(HostRoleStatus.PENDING.name()).once();
+    expect(executionSchedulerMock.getJobDetail((JobKey) anyObject()))
+      .andReturn(jobDetailMock).anyTimes();
+    expect((List<Trigger>) executionSchedulerMock
+      .getTriggersForJob((JobKey) anyObject())).andReturn(triggers).anyTimes();
+    expect(triggerMock.mayFireAgain()).andReturn(true).anyTimes();
+    expect(triggerMock.getFinalFireTime()).andReturn(pastDate).anyTimes();
+
+    
expect(requestExecutionMock.getStatus()).andReturn(RequestExecution.Status.SCHEDULED.name()).anyTimes();
+    expect(requestExecutionMock.getId()).andReturn(executionId).anyTimes();
+    
expect(requestExecutionMock.getBatchRequestRequestsIDs(3L)).andReturn(Collections.singleton(5L)).anyTimes();
+
+    //schedule staring from second batch, the first was completed
+    scheduleManager.scheduleBatch(eq(requestExecutionMock), eq(3L));
+    expectLastCall().once();
+
+    replay(clustersMock, clusterMock, requestExecutionMock,
+      executionSchedulerMock, scheduleManager, batchMock, batchRequestMock1, 
batchRequestMock2,
+      triggerMock, jobDetailMock, actionDBAccessorMock);
+
+    scheduleManager.updateBatchSchedule(requestExecutionMock);
+
+    verify(clustersMock, clusterMock, configurationMock, requestExecutionMock,
+      executionSchedulerMock, scheduleManager, batchMock, batchRequestMock1, 
batchRequestMock2,
+      triggerMock, jobDetailMock, actionDBAccessorMock);
+  }
+
 }
diff --git 
a/ambari-server/src/test/java/org/apache/ambari/server/state/RequestExecutionTest.java
 
b/ambari-server/src/test/java/org/apache/ambari/server/state/RequestExecutionTest.java
index bdcdb76..6e30c40 100644
--- 
a/ambari-server/src/test/java/org/apache/ambari/server/state/RequestExecutionTest.java
+++ 
b/ambari-server/src/test/java/org/apache/ambari/server/state/RequestExecutionTest.java
@@ -304,7 +304,8 @@ public class RequestExecutionTest {
     }
     Assert.assertNotNull(postBatchRequest);
     // Not read by default
-    Assert.assertNull(postBatchRequest.getBody());
+    Assert.assertNotNull(postBatchRequest.getBody());
+    Assert.assertEquals("testBody", postBatchRequest.getBody());
 
     RequestScheduleResponse requestScheduleResponse = requestExecution
       .convertToResponseWithBody();

Reply via email to