This is an automated email from the ASF dual-hosted git repository. dsen pushed a commit to branch trunk in repository https://gitbox.apache.org/repos/asf/ambari.git
The following commit(s) were added to refs/heads/trunk by this push: new 4fde749 [AMBARI-24733] Rolling Restarts: Option to pause, resume and abort re… (#2424) 4fde749 is described below commit 4fde749d3f8ff9ab37ec40a487ddc47cb9ab8ce3 Author: Dmitry Sen <d...@apache.org> AuthorDate: Fri Oct 12 23:30:44 2018 +0300 [AMBARI-24733] Rolling Restarts: Option to pause, resume and abort re… (#2424) * [AMBARI-24733] Rolling Restarts: Option to pause, resume and abort restarts (dsen) - initial * [AMBARI-24733] Rolling Restarts: Option to pause, resume and abort restarts (dsen) * [AMBARI-24733] Rolling Restarts: Option to pause, resume and abort restarts (dsen) * [AMBARI-24682] Rolling Restarts: Option to specify number of failures per batch (dsen) - added failed task count on resume * [AMBARI-24733] Rolling Restarts: Option to pause, resume and abort restarts (dsen) - changes according to review --- .../api/services/RequestScheduleService.java | 34 ++++ .../internal/RequestScheduleResourceProvider.java | 24 ++- .../scheduler/AbstractLinearExecutionJob.java | 17 ++ .../server/scheduler/ExecutionScheduleManager.java | 190 ++++++++++++++++++++- .../server/state/scheduler/BatchRequest.java | 10 ++ .../server/state/scheduler/RequestExecution.java | 10 +- .../state/scheduler/RequestExecutionImpl.java | 39 +++++ .../scheduler/ExecutionScheduleManagerTest.java | 159 ++++++++++++++++- .../ambari/server/state/RequestExecutionTest.java | 3 +- 9 files changed, 468 insertions(+), 18 deletions(-) diff --git a/ambari-server/src/main/java/org/apache/ambari/server/api/services/RequestScheduleService.java b/ambari-server/src/main/java/org/apache/ambari/server/api/services/RequestScheduleService.java index 8c08384..778d575 100644 --- a/ambari-server/src/main/java/org/apache/ambari/server/api/services/RequestScheduleService.java +++ b/ambari-server/src/main/java/org/apache/ambari/server/api/services/RequestScheduleService.java @@ -23,6 +23,7 @@ import java.util.Map; import javax.ws.rs.DELETE; import javax.ws.rs.GET; import javax.ws.rs.POST; +import javax.ws.rs.PUT; import javax.ws.rs.Path; import javax.ws.rs.PathParam; import javax.ws.rs.Produces; @@ -41,6 +42,7 @@ import io.swagger.annotations.Api; import io.swagger.annotations.ApiImplicitParam; import io.swagger.annotations.ApiImplicitParams; import io.swagger.annotations.ApiOperation; +import io.swagger.annotations.ApiParam; import io.swagger.annotations.ApiResponse; import io.swagger.annotations.ApiResponses; @@ -162,6 +164,38 @@ public class RequestScheduleService extends BaseService { createRequestSchedule(m_clusterName, null)); } + + /** + * Handles URL: /clusters/{clusterId}/request_schedules/{requestScheduleId} + * Get details on a specific request schedule + * + * @return + */ + + @PUT + @Path("{requestScheduleId}") + @Produces(MediaType.TEXT_PLAIN) + @ApiOperation(value = "Updates a scheduled request, usually used to pause running scheduled requests or to resume them.", + notes = "Changes the state of an existing request. Usually used to pause running scheduled requests or to resume them.", + nickname = "RequestSchedules#updateRequestSchedule" + ) + @ApiImplicitParams({ + @ApiImplicitParam(dataType = REQUEST_SCHEDULE_REQUEST_TYPE, paramType = PARAM_TYPE_BODY) + }) + @ApiResponses({ + @ApiResponse(code = HttpStatus.SC_OK, message = MSG_SUCCESSFUL_OPERATION), + @ApiResponse(code = HttpStatus.SC_ACCEPTED, message = MSG_REQUEST_ACCEPTED), + @ApiResponse(code = HttpStatus.SC_BAD_REQUEST, message = MSG_INVALID_ARGUMENTS), + @ApiResponse(code = HttpStatus.SC_NOT_FOUND, message = MSG_RESOURCE_NOT_FOUND), + @ApiResponse(code = HttpStatus.SC_UNAUTHORIZED, message = MSG_NOT_AUTHENTICATED), + @ApiResponse(code = HttpStatus.SC_FORBIDDEN, message = MSG_PERMISSION_DENIED), + @ApiResponse(code = HttpStatus.SC_INTERNAL_SERVER_ERROR, message = MSG_SERVER_ERROR), + }) + public Response updateRequestSchedule(String body, @Context HttpHeaders headers, @Context UriInfo ui, + @ApiParam @PathParam("requestScheduleId") String requestScheduleId) { + return handleRequest(headers, body, ui, Request.Type.PUT, createRequestSchedule(m_clusterName, requestScheduleId)); + } + /** * Handles DELETE /clusters/{clusterId}/request_schedules/{requestScheduleId} * Delete a request schedule diff --git a/ambari-server/src/main/java/org/apache/ambari/server/controller/internal/RequestScheduleResourceProvider.java b/ambari-server/src/main/java/org/apache/ambari/server/controller/internal/RequestScheduleResourceProvider.java index 5e8f849..bde31a5 100644 --- a/ambari-server/src/main/java/org/apache/ambari/server/controller/internal/RequestScheduleResourceProvider.java +++ b/ambari-server/src/main/java/org/apache/ambari/server/controller/internal/RequestScheduleResourceProvider.java @@ -430,11 +430,22 @@ public class RequestScheduleResourceProvider extends AbstractControllerResourceP String username = getManagementController().getAuthName(); Integer userId = getManagementController().getAuthId(); - requestExecution.setBatch(request.getBatch()); - requestExecution.setDescription(request.getDescription()); - requestExecution.setSchedule(request.getSchedule()); - if (request.getStatus() != null && isValidRequestScheduleStatus - (request.getStatus())) { + if (request.getDescription() != null) { + requestExecution.setDescription(request.getDescription()); + } + + if (request.getSchedule() != null) { + requestExecution.setSchedule(request.getSchedule()); + } + + if (request.getStatus() != null) { + //TODO status changes graph + if (!isValidRequestScheduleStatus(request.getStatus())) { + throw new AmbariException("Request Schedule status not valid" + + ", clusterName = " + request.getClusterName() + + ", description = " + request.getDescription() + + ", id = " + request.getId()); + } requestExecution.setStatus(RequestExecution.Status.valueOf(request.getStatus())); } requestExecution.setUpdateUser(username); @@ -443,6 +454,7 @@ public class RequestScheduleResourceProvider extends AbstractControllerResourceP LOG.info("Persisting updated Request Schedule " + ", clusterName = " + request.getClusterName() + ", description = " + request.getDescription() + + ", status = " + request.getStatus() + ", user = " + username); requestExecution.persist(); @@ -502,7 +514,7 @@ public class RequestScheduleResourceProvider extends AbstractControllerResourceP // Setup batch schedule getManagementController().getExecutionScheduleManager() - .scheduleBatch(requestExecution); + .scheduleAllBatches(requestExecution); RequestScheduleResponse response = new RequestScheduleResponse (requestExecution.getId(), requestExecution.getClusterName(), diff --git a/ambari-server/src/main/java/org/apache/ambari/server/scheduler/AbstractLinearExecutionJob.java b/ambari-server/src/main/java/org/apache/ambari/server/scheduler/AbstractLinearExecutionJob.java index 4599dfa..d99e89e 100644 --- a/ambari-server/src/main/java/org/apache/ambari/server/scheduler/AbstractLinearExecutionJob.java +++ b/ambari-server/src/main/java/org/apache/ambari/server/scheduler/AbstractLinearExecutionJob.java @@ -17,6 +17,10 @@ */ package org.apache.ambari.server.scheduler; +import static org.apache.ambari.server.state.scheduler.BatchRequestJob.BATCH_REQUEST_CLUSTER_NAME_KEY; +import static org.apache.ambari.server.state.scheduler.BatchRequestJob.BATCH_REQUEST_EXECUTION_ID_KEY; +import static org.apache.ambari.server.state.scheduler.RequestExecution.Status.ABORTED; +import static org.apache.ambari.server.state.scheduler.RequestExecution.Status.PAUSED; import static org.quartz.DateBuilder.futureDate; import static org.quartz.SimpleScheduleBuilder.simpleSchedule; import static org.quartz.TriggerBuilder.newTrigger; @@ -125,6 +129,19 @@ public abstract class AbstractLinearExecutionJob implements ExecutionJob { return; } + String status = null; + try { + status = executionScheduleManager.getBatchRequestStatus(jobDataMap.getLong(BATCH_REQUEST_EXECUTION_ID_KEY), jobDataMap.getString(BATCH_REQUEST_CLUSTER_NAME_KEY)); + } catch (AmbariException e) { + LOG.warn("Unable to define the status of batch request : ", e); + } + + if(ABORTED.name().equals(status) || PAUSED.name().equals(status)) { + LOG.info("The linear job chain was paused or aborted, not triggering the next one"); + return; + } + + int separationSeconds = jobDataMap.getIntValue(NEXT_EXECUTION_SEPARATION_SECONDS); Object failedCount = properties.get(BatchRequestJob.BATCH_REQUEST_FAILED_TASKS_KEY); Object totalCount = properties.get(BatchRequestJob.BATCH_REQUEST_TOTAL_TASKS_KEY); diff --git a/ambari-server/src/main/java/org/apache/ambari/server/scheduler/ExecutionScheduleManager.java b/ambari-server/src/main/java/org/apache/ambari/server/scheduler/ExecutionScheduleManager.java index 4f03a3e..3295395 100644 --- a/ambari-server/src/main/java/org/apache/ambari/server/scheduler/ExecutionScheduleManager.java +++ b/ambari-server/src/main/java/org/apache/ambari/server/scheduler/ExecutionScheduleManager.java @@ -18,6 +18,9 @@ package org.apache.ambari.server.scheduler; +import static org.apache.ambari.server.state.scheduler.RequestExecution.Status.ABORTED; +import static org.apache.ambari.server.state.scheduler.RequestExecution.Status.PAUSED; +import static org.apache.ambari.server.state.scheduler.RequestExecution.Status.SCHEDULED; import static org.quartz.CronScheduleBuilder.cronSchedule; import static org.quartz.JobBuilder.newJob; import static org.quartz.SimpleScheduleBuilder.simpleSchedule; @@ -29,6 +32,7 @@ import java.security.SecureRandom; import java.security.cert.CertificateException; import java.security.cert.X509Certificate; import java.text.ParseException; +import java.util.Collection; import java.util.Collections; import java.util.Date; import java.util.HashMap; @@ -47,6 +51,14 @@ import org.apache.ambari.server.AmbariException; import org.apache.ambari.server.actionmanager.ActionDBAccessor; import org.apache.ambari.server.actionmanager.HostRoleStatus; import org.apache.ambari.server.configuration.Configuration; +import org.apache.ambari.server.controller.internal.AbstractControllerResourceProvider; +import org.apache.ambari.server.controller.internal.RequestImpl; +import org.apache.ambari.server.controller.internal.RequestResourceProvider; +import org.apache.ambari.server.controller.spi.Predicate; +import org.apache.ambari.server.controller.spi.RequestStatus; +import org.apache.ambari.server.controller.spi.Resource; +import org.apache.ambari.server.controller.spi.ResourceProvider; +import org.apache.ambari.server.controller.utilities.PredicateBuilder; import org.apache.ambari.server.security.authorization.internal.InternalTokenClientFilter; import org.apache.ambari.server.security.authorization.internal.InternalTokenStorage; import org.apache.ambari.server.state.Cluster; @@ -275,13 +287,42 @@ public class ExecutionScheduleManager { return true; } + + private long getFirstJobOrderId(RequestExecution requestExecution) throws AmbariException { + Long firstBatchOrderId = null; + Batch batch = requestExecution.getBatch(); + if (batch != null) { + List<BatchRequest> batchRequests = batch.getBatchRequests(); + if (batchRequests != null) { + Collections.sort(batchRequests); + ListIterator<BatchRequest> iterator = batchRequests.listIterator(); + firstBatchOrderId = iterator.next().getOrderId(); + } + } + if (firstBatchOrderId == null) { + throw new AmbariException("Can't schedule RequestExecution with no batches"); + } + return firstBatchOrderId; + } + /** * Persist jobs based on the request batch and create trigger for the first * job * @param requestExecution * @throws AmbariException */ - public void scheduleBatch(RequestExecution requestExecution) + public void scheduleAllBatches(RequestExecution requestExecution) throws AmbariException { + Long firstBatchOrderId = getFirstJobOrderId(requestExecution); + scheduleBatch(requestExecution, firstBatchOrderId); + } + + /** + * Persist jobs based on the request batches staring from the defined batch and create trigger for the first + * job + * @param requestExecution + * @throws AmbariException + */ + public void scheduleBatch(RequestExecution requestExecution, long startingBatchOrderId) throws AmbariException { if (!isSchedulerAvailable()) { @@ -297,15 +338,18 @@ public class ExecutionScheduleManager { LOG.error("Unable to determine scheduler state.", e); throw new AmbariException("Scheduler unavailable."); } + LOG.debug("Scheduling jobs starting from " + startingBatchOrderId); // Create and persist jobs based on batches - JobDetail firstJobDetail = persistBatch(requestExecution); + JobDetail firstJobDetail = persistBatch(requestExecution, startingBatchOrderId); if (firstJobDetail == null) { throw new AmbariException("Unable to schedule jobs. firstJobDetail = " + firstJobDetail); } + Integer failedCount = countFailedTasksBeforeStartingBatch(requestExecution, startingBatchOrderId); + // Create a cron trigger for the first batch job // If no schedule is specified create simple trigger to fire right away Schedule schedule = requestExecution.getSchedule(); @@ -332,6 +376,7 @@ public class ExecutionScheduleManager { .withSchedule(cronSchedule(triggerExpression) .withMisfireHandlingInstructionFireAndProceed()) .forJob(firstJobDetail) + .usingJobData(BatchRequestJob.BATCH_REQUEST_FAILED_TASKS_KEY, failedCount) .startAt(startDate) .endAt(endDate) .build(); @@ -351,6 +396,7 @@ public class ExecutionScheduleManager { .withIdentity(REQUEST_EXECUTION_TRIGGER_PREFIX + "-" + requestExecution.getId(), ExecutionJob.LINEAR_EXECUTION_TRIGGER_GROUP) .withSchedule(simpleSchedule().withMisfireHandlingInstructionFireNow()) + .usingJobData(BatchRequestJob.BATCH_REQUEST_FAILED_TASKS_KEY, failedCount) .startNow() .build(); @@ -364,7 +410,35 @@ public class ExecutionScheduleManager { } } - private JobDetail persistBatch(RequestExecution requestExecution) + private Integer countFailedTasksBeforeStartingBatch(RequestExecution requestExecution, long startingBatchOrderId) throws AmbariException { + int result = 0; + Batch batch = requestExecution.getBatch(); + if (batch != null) { + List<BatchRequest> batchRequests = batch.getBatchRequests(); + if (batchRequests != null) { + Collections.sort(batchRequests); + for (BatchRequest batchRequest : batchRequests) { + if (batchRequest.getOrderId() >= startingBatchOrderId) break; + + if (batchRequest.getRequestId() != null) { + BatchRequestResponse batchRequestResponse = getBatchRequestResponse(batchRequest.getRequestId(), requestExecution.getClusterName()); + if (batchRequestResponse != null) { + result += batchRequestResponse.getFailedTaskCount() + + batchRequestResponse.getAbortedTaskCount() + + batchRequestResponse.getTimedOutTaskCount(); + } + } + } + } + } + return result; + } + + /** + * Creates and stores the chain of BatchRequestJobs - quartz jobs - in order from the last order Id to the startingBatchOrderId + * @return the quartz job that corresponds to startingBatchOrderId + */ + private JobDetail persistBatch(RequestExecution requestExecution, long startingBatchOrderId) throws AmbariException { Batch batch = requestExecution.getBatch(); @@ -376,7 +450,8 @@ public class ExecutionScheduleManager { Collections.sort(batchRequests); ListIterator<BatchRequest> iterator = batchRequests.listIterator(batchRequests.size()); String nextJobName = null; - while (iterator.hasPrevious()) { + long nextBatchOrderId = batchRequests.size(); + while (nextBatchOrderId != startingBatchOrderId && iterator.hasPrevious()) { BatchRequest batchRequest = iterator.previous(); String jobName = getJobName(requestExecution.getId(), @@ -409,6 +484,7 @@ public class ExecutionScheduleManager { } nextJobName = jobName; + nextBatchOrderId = batchRequest.getOrderId(); } } } @@ -421,14 +497,60 @@ public class ExecutionScheduleManager { } /** - * Delete and re-create all jobs and triggers - * Update schedule for a batch + * Pause/resume/abort request schedule and related jobs and triggers * @param requestExecution */ public void updateBatchSchedule(RequestExecution requestExecution) throws AmbariException { + BatchRequest activeBatch = calculateActiveBatch(requestExecution); + if (activeBatch == null) { + LOG.warn("Ignoring RequestExecution status update since all batches has been executed"); + return; + } + if (requestExecution.getStatus().equals(SCHEDULED.name())) { + scheduleBatch(requestExecution, activeBatch.getOrderId()); + } else if (requestExecution.getStatus().equals(PAUSED.name()) || + requestExecution.getStatus().equals(ABORTED.name())) { + LOG.info("Request execution status changed to " + requestExecution.getStatus() + " for request schedule " + + requestExecution.getId() + ". Deleting related jobs."); + deleteJobs(requestExecution, activeBatch.getOrderId()); + Collection<Long> requestIDsToAbort = requestExecution.getBatchRequestRequestsIDs(activeBatch.getOrderId()); + for (Long requestId : requestIDsToAbort) { + //might be null if the request is for not long running job + if (requestId == null) continue; + abortRequestById(requestExecution, requestId); + } + } + } + + /** + * Iterate through the batches and find the first one with not completed status, if all were completed return null + * @param requestExecution + * @return + */ + private BatchRequest calculateActiveBatch(RequestExecution requestExecution) { + BatchRequest result = null; + Batch batch = requestExecution.getBatch(); + if (batch != null) { + List<BatchRequest> batchRequests = batch.getBatchRequests(); + if (batchRequests != null) { + Collections.sort(batchRequests); + ListIterator<BatchRequest> iterator = batchRequests.listIterator(); + do { + result = iterator.next(); + } while (iterator.hasNext() && + HostRoleStatus.getCompletedStates().contains(HostRoleStatus.valueOf(result.getStatus())) && + !HostRoleStatus.ABORTED.name().equals(result.getStatus())); + } + } + + if (result != null && + HostRoleStatus.getCompletedStates().contains(HostRoleStatus.valueOf(result.getStatus())) && + !HostRoleStatus.ABORTED.name().equals(result.getStatus())) { + return null; + } - // TODO: Support delete and update if no jobs are running + return result; } /** @@ -483,6 +605,15 @@ public class ExecutionScheduleManager { * @throws AmbariException */ public void deleteAllJobs(RequestExecution requestExecution) throws AmbariException { + Long firstBatchOrderId = getFirstJobOrderId(requestExecution); + deleteJobs(requestExecution, firstBatchOrderId); + } + + /** + * Delete all jobs and triggers if possible. + * @throws AmbariException + */ + public void deleteJobs(RequestExecution requestExecution, Long startingBatchOrderId) throws AmbariException { if (!isSchedulerAvailable()) { throw new AmbariException("Scheduler unavailable."); } @@ -492,7 +623,11 @@ public class ExecutionScheduleManager { if (batch != null) { List<BatchRequest> batchRequests = batch.getBatchRequests(); if (batchRequests != null) { + Collections.sort(batchRequests); for (BatchRequest batchRequest : batchRequests) { + //skip all before starting batch + if (batchRequest.getOrderId() < startingBatchOrderId) continue; + String jobName = getJobName(requestExecution.getId(), batchRequest.getOrderId()); @@ -540,6 +675,8 @@ public class ExecutionScheduleManager { actionDBAccessor.setSourceScheduleForRequest(batchRequestResponse.getRequestId(), executionId); } + batchRequest.setRequestId(batchRequestResponse.getRequestId()); + return batchRequestResponse.getRequestId(); } catch (Exception e) { throw new AmbariException("Exception occurred while performing request", e); @@ -566,6 +703,33 @@ public class ExecutionScheduleManager { } + protected RequestStatus abortRequestById(RequestExecution requestExecution, Long requestId) throws AmbariException { + LOG.debug("Aborting request " + requestId); + ResourceProvider provider = + AbstractControllerResourceProvider.getResourceProvider(Resource.Type.Request); + + Map<String, Object> properties = new HashMap<>(); + properties.put(RequestResourceProvider.REQUEST_CLUSTER_NAME_PROPERTY_ID, requestExecution.getClusterName()); + properties.put(RequestResourceProvider.REQUEST_ABORT_REASON_PROPERTY_ID, "Request execution status changed to " + requestExecution.getStatus()); + properties.put(RequestResourceProvider.REQUEST_ID_PROPERTY_ID, Long.toString(requestId)); + properties.put(RequestResourceProvider.REQUEST_STATUS_PROPERTY_ID, HostRoleStatus.ABORTED.name()); + + org.apache.ambari.server.controller.spi.Request request = new RequestImpl(Collections.emptySet(), + Collections.singleton(properties), Collections.emptyMap(), null); + + Predicate predicate = new PredicateBuilder() + .property(RequestResourceProvider.REQUEST_CLUSTER_NAME_PROPERTY_ID) + .equals(requestExecution.getClusterName()).and() + .property(RequestResourceProvider.REQUEST_ID_PROPERTY_ID) + .equals(Long.toString(requestId)).toPredicate(); + + try{ + return provider.updateResources(request, predicate); + } catch (Exception e) { + throw new AmbariException("Error while aborting the request.", e); + } + } + private BatchRequestResponse convertToBatchRequestResponse(ClientResponse clientResponse) { BatchRequestResponse batchRequestResponse = new BatchRequestResponse(); int retCode = clientResponse.getStatus(); @@ -638,6 +802,18 @@ public class ExecutionScheduleManager { return batchRequestResponse; } + public String getBatchRequestStatus(Long executionId, String clusterName) throws AmbariException { + Cluster cluster = clusters.getCluster(clusterName); + RequestExecution requestExecution = cluster.getAllRequestExecutions().get(executionId); + + if (requestExecution == null) { + throw new AmbariException("Unable to find request schedule with id = " + + executionId); + } + + return requestExecution.getStatus(); + } + public void updateBatchRequest(long executionId, long batchId, String clusterName, BatchRequestResponse batchRequestResponse, boolean statusOnly) throws AmbariException { diff --git a/ambari-server/src/main/java/org/apache/ambari/server/state/scheduler/BatchRequest.java b/ambari-server/src/main/java/org/apache/ambari/server/state/scheduler/BatchRequest.java index 3063d29..a6d194f 100644 --- a/ambari-server/src/main/java/org/apache/ambari/server/state/scheduler/BatchRequest.java +++ b/ambari-server/src/main/java/org/apache/ambari/server/state/scheduler/BatchRequest.java @@ -22,6 +22,7 @@ import org.codehaus.jackson.map.annotate.JsonSerialize; public class BatchRequest implements Comparable<BatchRequest> { private Long orderId; + private Long requestId; private Type type; private String uri; private String body; @@ -38,6 +39,15 @@ public class BatchRequest implements Comparable<BatchRequest> { this.orderId = orderId; } + @JsonProperty("request_id") + public Long getRequestId() { + return requestId; + } + + public void setRequestId(Long requestId) { + this.requestId = requestId; + } + @JsonProperty("request_type") public String getType() { return type.name(); diff --git a/ambari-server/src/main/java/org/apache/ambari/server/state/scheduler/RequestExecution.java b/ambari-server/src/main/java/org/apache/ambari/server/state/scheduler/RequestExecution.java index 6d9c40b..f91b6f5 100644 --- a/ambari-server/src/main/java/org/apache/ambari/server/state/scheduler/RequestExecution.java +++ b/ambari-server/src/main/java/org/apache/ambari/server/state/scheduler/RequestExecution.java @@ -17,6 +17,8 @@ */ package org.apache.ambari.server.state.scheduler; +import java.util.Collection; + import org.apache.ambari.server.controller.RequestScheduleResponse; /** @@ -161,6 +163,10 @@ public interface RequestExecution { String getRequestBody(Long batchId); /** + * Get the requests IDs for the batch + */ + Collection<Long> getBatchRequestRequestsIDs(long batchId); + /** * Get batch request with specified order id */ BatchRequest getBatchRequest(long batchId); @@ -184,6 +190,8 @@ public interface RequestExecution { enum Status { SCHEDULED, COMPLETED, - DISABLED + DISABLED, + ABORTED, + PAUSED } } diff --git a/ambari-server/src/main/java/org/apache/ambari/server/state/scheduler/RequestExecutionImpl.java b/ambari-server/src/main/java/org/apache/ambari/server/state/scheduler/RequestExecutionImpl.java index 104ca9b..fe18b9a 100644 --- a/ambari-server/src/main/java/org/apache/ambari/server/state/scheduler/RequestExecutionImpl.java +++ b/ambari-server/src/main/java/org/apache/ambari/server/state/scheduler/RequestExecutionImpl.java @@ -17,14 +17,17 @@ */ package org.apache.ambari.server.state.scheduler; +import java.util.ArrayList; import java.util.Collection; import java.util.Collections; import java.util.List; import java.util.concurrent.locks.ReadWriteLock; import java.util.concurrent.locks.ReentrantReadWriteLock; +import java.util.stream.Collectors; import javax.annotation.Nullable; +import org.apache.ambari.server.actionmanager.HostRoleStatus; import org.apache.ambari.server.controller.RequestScheduleResponse; import org.apache.ambari.server.orm.dao.ClusterDAO; import org.apache.ambari.server.orm.dao.HostDAO; @@ -115,6 +118,10 @@ public class RequestExecutionImpl implements RequestExecution { batchRequestEntities) { BatchRequest batchRequest = new BatchRequest(); batchRequest.setOrderId(batchRequestEntity.getBatchId()); + batchRequest.setRequestId(batchRequestEntity.getRequestId()); + if (batchRequestEntity.getRequestBody() != null) { + batchRequest.setBody(new String(batchRequestEntity.getRequestBody())); + } batchRequest.setType(BatchRequest.Type.valueOf(batchRequestEntity.getRequestType())); batchRequest.setUri(batchRequestEntity.getRequestUri()); batchRequest.setStatus(batchRequestEntity.getRequestStatus()); @@ -133,6 +140,13 @@ public class RequestExecutionImpl implements RequestExecution { schedule.setStartTime(requestScheduleEntity.getStartTime()); schedule.setEndTime(requestScheduleEntity.getEndTime()); + //if all values are nulls set the general scheduler to null + if (schedule.getDayOfWeek() == null && schedule.getDaysOfMonth() == null && + schedule.getMinutes() == null && schedule.getHours() == null && + schedule.getMonth() == null && schedule.getYear() == null && + schedule.getStartTime() == null && schedule.getEndTime() == null) { + schedule = null; + } isPersisted = true; } @@ -276,6 +290,7 @@ public class RequestExecutionImpl implements RequestExecution { RequestScheduleBatchRequestEntity batchRequestEntity = new RequestScheduleBatchRequestEntity(); batchRequestEntity.setBatchId(batchRequest.getOrderId()); + batchRequestEntity.setRequestId(batchRequest.getRequestId()); batchRequestEntity.setScheduleId(requestScheduleEntity.getScheduleId()); batchRequestEntity.setRequestScheduleEntity(requestScheduleEntity); batchRequestEntity.setRequestType(batchRequest.getType()); @@ -428,6 +443,22 @@ public class RequestExecutionImpl implements RequestExecution { } @Override + public Collection<Long> getBatchRequestRequestsIDs(long batchId) { + Collection<Long> requestIDs = new ArrayList<>(); + if (requestScheduleEntity != null) { + Collection<RequestScheduleBatchRequestEntity> requestEntities = + requestScheduleEntity.getRequestScheduleBatchRequestEntities(); + if (requestEntities != null) { + requestIDs.addAll(requestEntities.stream() + .filter(requestEntity -> requestEntity.getBatchId().equals(batchId)) + .map(RequestScheduleBatchRequestEntity::getRequestId) + .collect(Collectors.toList())); + } + } + return requestIDs; + } + + @Override public BatchRequest getBatchRequest(long batchId) { for (BatchRequest batchRequest : batch.getBatchRequests()) { if (batchId == batchRequest.getOrderId()) { @@ -452,6 +483,14 @@ public class RequestExecutionImpl implements RequestExecution { } } + // Rare race condition when batch request finished during pausing the request execution, + //in this case the job details will be deleted, + //so we mark it as not completed because otherwise the job detail will be lost + //and the whole Request Execution status will not be set to COMPLETED at the end. + if (Status.PAUSED.name().equals(getStatus()) && HostRoleStatus.COMPLETED.name().equals(batchRequestResponse.getStatus())) { + batchRequestResponse.setStatus(HostRoleStatus.ABORTED.name()); + } + if (batchRequestEntity != null) { batchRequestEntity.setRequestStatus(batchRequestResponse.getStatus()); diff --git a/ambari-server/src/test/java/org/apache/ambari/server/scheduler/ExecutionScheduleManagerTest.java b/ambari-server/src/test/java/org/apache/ambari/server/scheduler/ExecutionScheduleManagerTest.java index 3f4a5d5..3097279 100644 --- a/ambari-server/src/test/java/org/apache/ambari/server/scheduler/ExecutionScheduleManagerTest.java +++ b/ambari-server/src/test/java/org/apache/ambari/server/scheduler/ExecutionScheduleManagerTest.java @@ -33,6 +33,7 @@ import static org.junit.Assert.assertThat; import java.lang.reflect.InvocationTargetException; import java.util.ArrayList; +import java.util.Collections; import java.util.Date; import java.util.HashMap; import java.util.List; @@ -216,7 +217,7 @@ public class ExecutionScheduleManagerTest { RequestExecution requestExecution = createRequestExecution(true); Assert.assertNotNull(requestExecution); - executionScheduleManager.scheduleBatch(requestExecution); + executionScheduleManager.scheduleAllBatches(requestExecution); String jobName1 = executionScheduleManager.getJobName(requestExecution .getId(), 10L); @@ -280,7 +281,7 @@ public class ExecutionScheduleManagerTest { RequestExecution requestExecution = createRequestExecution(true); Assert.assertNotNull(requestExecution); - executionScheduleManager.scheduleBatch(requestExecution); + executionScheduleManager.scheduleAllBatches(requestExecution); String jobName1 = executionScheduleManager.getJobName(requestExecution .getId(), 10L); @@ -308,7 +309,7 @@ public class ExecutionScheduleManagerTest { RequestExecution requestExecution = createRequestExecution(false); Assert.assertNotNull(requestExecution); - executionScheduleManager.scheduleBatch(requestExecution); + executionScheduleManager.scheduleAllBatches(requestExecution); String jobName1 = executionScheduleManager.getJobName(requestExecution .getId(), 10L); @@ -387,6 +388,9 @@ public class ExecutionScheduleManagerTest { expect(batchRequestMock.getUri()).andReturn(uri).once(); expect(batchRequestMock.getType()).andReturn(type).once(); + batchRequestMock.setRequestId(5L); + expectLastCall().once(); + expect(scheduleManager.performApiRequest(eq(uri), eq(body), eq(type), eq(userId))).andReturn(batchRequestResponse).once(); scheduleManager.updateBatchRequest(eq(executionId), eq(batchId), eq(clusterName), eq(batchRequestResponse), eq(false)); @@ -721,4 +725,153 @@ public class ExecutionScheduleManagerTest { assertEquals("http://localhost:8080/", scheduleManager.extendApiResource(webResource, "").getURI().toString()); } + + @Test + public void testUpdateBatchSchedulePause() throws Exception { + Clusters clustersMock = createMock(Clusters.class); + Cluster clusterMock = createMock(Cluster.class); + Configuration configurationMock = createNiceMock(Configuration.class); + ExecutionScheduler executionSchedulerMock = createMock(ExecutionScheduler.class); + InternalTokenStorage tokenStorageMock = createMock(InternalTokenStorage.class); + ActionDBAccessor actionDBAccessorMock = createMock(ActionDBAccessor.class); + Gson gson = new Gson(); + RequestExecution requestExecutionMock = createMock(RequestExecution.class); + Batch batchMock = createMock(Batch.class); + JobDetail jobDetailMock = createMock(JobDetail.class); + final BatchRequest batchRequestMock1 = createMock(BatchRequest.class); + final BatchRequest batchRequestMock2 = createMock(BatchRequest.class); + final Trigger triggerMock = createNiceMock(Trigger.class); + final List<Trigger> triggers = new ArrayList<Trigger>() {{ add(triggerMock); }}; + + long executionId = 11L; + String clusterName = "c1"; + Date pastDate = new Date(new Date().getTime() - 2); + + Map<Long, RequestExecution> executionMap = new HashMap<>(); + executionMap.put(executionId, requestExecutionMock); + + EasyMock.expect(configurationMock.getApiSSLAuthentication()).andReturn(Boolean.FALSE); + EasyMock.replay(configurationMock); + + ExecutionScheduleManager scheduleManager = + createMockBuilder(ExecutionScheduleManager.class) + .withConstructor(configurationMock, executionSchedulerMock, tokenStorageMock, + clustersMock, actionDBAccessorMock, gson) + .addMockedMethods("deleteJobs", "abortRequestById").createMock(); + + expect(clustersMock.getCluster(clusterName)).andReturn(clusterMock).anyTimes(); + expect(clusterMock.getAllRequestExecutions()).andReturn(executionMap).anyTimes(); + expect(requestExecutionMock.getBatch()).andReturn(batchMock).anyTimes(); + expect(batchMock.getBatchRequests()).andReturn + (new ArrayList<BatchRequest>() {{ + add(batchRequestMock1); + add(batchRequestMock2); + }}); + expect(batchRequestMock1.getOrderId()).andReturn(1L).anyTimes(); + expect(batchRequestMock1.getStatus()).andReturn(HostRoleStatus.COMPLETED.name()).anyTimes(); + expect(batchRequestMock1.compareTo(batchRequestMock2)).andReturn(-1).anyTimes(); + expect(batchRequestMock2.compareTo(batchRequestMock1)).andReturn(1).anyTimes(); + expect(batchRequestMock2.getOrderId()).andReturn(3L).anyTimes(); + expect(batchRequestMock2.getStatus()).andReturn(HostRoleStatus.IN_PROGRESS.name()).anyTimes(); + expect(executionSchedulerMock.getJobDetail((JobKey) anyObject())) + .andReturn(jobDetailMock).anyTimes(); + expect((List<Trigger>) executionSchedulerMock + .getTriggersForJob((JobKey) anyObject())).andReturn(triggers).anyTimes(); + expect(triggerMock.mayFireAgain()).andReturn(true).anyTimes(); + expect(triggerMock.getFinalFireTime()).andReturn(pastDate).anyTimes(); + + expect(requestExecutionMock.getStatus()).andReturn(RequestExecution.Status.PAUSED.name()).anyTimes(); + expect(requestExecutionMock.getId()).andReturn(executionId).anyTimes(); + expect(requestExecutionMock.getBatchRequestRequestsIDs(3L)).andReturn(Collections.singleton(5L)).anyTimes(); + + //deletes only second batch, the first was completed + scheduleManager.deleteJobs(eq(requestExecutionMock), eq(3L)); + expectLastCall().once(); + //second batch request needs to be aborted + expect(scheduleManager.abortRequestById(requestExecutionMock, 5L)).andReturn(null).once(); + + replay(clustersMock, clusterMock, requestExecutionMock, + executionSchedulerMock, scheduleManager, batchMock, batchRequestMock1, batchRequestMock2, + triggerMock, jobDetailMock, actionDBAccessorMock); + + scheduleManager.updateBatchSchedule(requestExecutionMock); + + verify(clustersMock, clusterMock, configurationMock, requestExecutionMock, + executionSchedulerMock, scheduleManager, batchMock, batchRequestMock1, batchRequestMock2, + triggerMock, jobDetailMock, actionDBAccessorMock); + } + + @Test + public void testUpdateBatchScheduleUnpause() throws Exception { + Clusters clustersMock = createMock(Clusters.class); + Cluster clusterMock = createMock(Cluster.class); + Configuration configurationMock = createNiceMock(Configuration.class); + ExecutionScheduler executionSchedulerMock = createMock(ExecutionScheduler.class); + InternalTokenStorage tokenStorageMock = createMock(InternalTokenStorage.class); + ActionDBAccessor actionDBAccessorMock = createMock(ActionDBAccessor.class); + Gson gson = new Gson(); + RequestExecution requestExecutionMock = createMock(RequestExecution.class); + Batch batchMock = createMock(Batch.class); + JobDetail jobDetailMock = createMock(JobDetail.class); + final BatchRequest batchRequestMock1 = createMock(BatchRequest.class); + final BatchRequest batchRequestMock2 = createMock(BatchRequest.class); + final Trigger triggerMock = createNiceMock(Trigger.class); + final List<Trigger> triggers = new ArrayList<Trigger>() {{ add(triggerMock); }}; + + long executionId = 11L; + String clusterName = "c1"; + Date pastDate = new Date(new Date().getTime() - 2); + + Map<Long, RequestExecution> executionMap = new HashMap<>(); + executionMap.put(executionId, requestExecutionMock); + + EasyMock.expect(configurationMock.getApiSSLAuthentication()).andReturn(Boolean.FALSE); + EasyMock.replay(configurationMock); + + ExecutionScheduleManager scheduleManager = + createMockBuilder(ExecutionScheduleManager.class) + .withConstructor(configurationMock, executionSchedulerMock, tokenStorageMock, + clustersMock, actionDBAccessorMock, gson) + .addMockedMethods("scheduleBatch").createMock(); + + expect(clustersMock.getCluster(clusterName)).andReturn(clusterMock).anyTimes(); + expect(clusterMock.getAllRequestExecutions()).andReturn(executionMap).anyTimes(); + expect(requestExecutionMock.getBatch()).andReturn(batchMock).anyTimes(); + expect(batchMock.getBatchRequests()).andReturn + (new ArrayList<BatchRequest>() {{ + add(batchRequestMock1); + add(batchRequestMock2); + }}); + expect(batchRequestMock1.getOrderId()).andReturn(1L).anyTimes(); + expect(batchRequestMock1.getStatus()).andReturn(HostRoleStatus.FAILED.name()).anyTimes(); + expect(batchRequestMock1.compareTo(batchRequestMock2)).andReturn(-1).anyTimes(); + expect(batchRequestMock2.compareTo(batchRequestMock1)).andReturn(1).anyTimes(); + expect(batchRequestMock2.getOrderId()).andReturn(3L).anyTimes(); + expect(batchRequestMock2.getStatus()).andReturn(HostRoleStatus.PENDING.name()).once(); + expect(executionSchedulerMock.getJobDetail((JobKey) anyObject())) + .andReturn(jobDetailMock).anyTimes(); + expect((List<Trigger>) executionSchedulerMock + .getTriggersForJob((JobKey) anyObject())).andReturn(triggers).anyTimes(); + expect(triggerMock.mayFireAgain()).andReturn(true).anyTimes(); + expect(triggerMock.getFinalFireTime()).andReturn(pastDate).anyTimes(); + + expect(requestExecutionMock.getStatus()).andReturn(RequestExecution.Status.SCHEDULED.name()).anyTimes(); + expect(requestExecutionMock.getId()).andReturn(executionId).anyTimes(); + expect(requestExecutionMock.getBatchRequestRequestsIDs(3L)).andReturn(Collections.singleton(5L)).anyTimes(); + + //schedule staring from second batch, the first was completed + scheduleManager.scheduleBatch(eq(requestExecutionMock), eq(3L)); + expectLastCall().once(); + + replay(clustersMock, clusterMock, requestExecutionMock, + executionSchedulerMock, scheduleManager, batchMock, batchRequestMock1, batchRequestMock2, + triggerMock, jobDetailMock, actionDBAccessorMock); + + scheduleManager.updateBatchSchedule(requestExecutionMock); + + verify(clustersMock, clusterMock, configurationMock, requestExecutionMock, + executionSchedulerMock, scheduleManager, batchMock, batchRequestMock1, batchRequestMock2, + triggerMock, jobDetailMock, actionDBAccessorMock); + } + } diff --git a/ambari-server/src/test/java/org/apache/ambari/server/state/RequestExecutionTest.java b/ambari-server/src/test/java/org/apache/ambari/server/state/RequestExecutionTest.java index bdcdb76..6e30c40 100644 --- a/ambari-server/src/test/java/org/apache/ambari/server/state/RequestExecutionTest.java +++ b/ambari-server/src/test/java/org/apache/ambari/server/state/RequestExecutionTest.java @@ -304,7 +304,8 @@ public class RequestExecutionTest { } Assert.assertNotNull(postBatchRequest); // Not read by default - Assert.assertNull(postBatchRequest.getBody()); + Assert.assertNotNull(postBatchRequest.getBody()); + Assert.assertEquals("testBody", postBatchRequest.getBody()); RequestScheduleResponse requestScheduleResponse = requestExecution .convertToResponseWithBody();