This is an automated email from the ASF dual-hosted git repository. adoroszlai pushed a commit to branch trunk in repository https://gitbox.apache.org/repos/asf/ambari.git
The following commit(s) were added to refs/heads/trunk by this push: new f837a6c AMBARI-22805. Blueprints do not handle some failures properly f837a6c is described below commit f837a6cead0fa27118ab37da3eadf40338af0537 Author: Doroszlai, Attila <adorosz...@apache.org> AuthorDate: Tue Jan 16 14:11:07 2018 +0100 AMBARI-22805. Blueprints do not handle some failures properly --- .../controller/internal/CalculatedStatus.java | 7 +- .../internal/HostComponentResourceProvider.java | 35 +--- .../internal/RequestResourceProvider.java | 31 +-- .../orm/entities/TopologyHostRequestEntity.java | 27 +++ .../server/topology/AsyncCallableService.java | 16 +- .../ambari/server/topology/HostOfferResponse.java | 23 ++- .../apache/ambari/server/topology/HostRequest.java | 52 ++++- .../ambari/server/topology/LogicalRequest.java | 44 ++++ .../ambari/server/topology/PersistedState.java | 5 + .../ambari/server/topology/PersistedStateImpl.java | 11 + .../ambari/server/topology/TopologyManager.java | 54 +++-- .../server/topology/tasks/TopologyHostTask.java | 4 + .../ambari/server/topology/tasks/TopologyTask.java | 20 +- .../ambari/server/upgrade/SchemaUpgradeHelper.java | 1 + .../ambari/server/upgrade/UpgradeCatalog262.java | 70 +++++++ .../src/main/resources/Ambari-DDL-Derby-CREATE.sql | 2 + .../src/main/resources/Ambari-DDL-MySQL-CREATE.sql | 2 + .../main/resources/Ambari-DDL-Oracle-CREATE.sql | 2 + .../main/resources/Ambari-DDL-Postgres-CREATE.sql | 2 + .../resources/Ambari-DDL-SQLAnywhere-CREATE.sql | 2 + .../main/resources/Ambari-DDL-SQLServer-CREATE.sql | 2 + .../internal/RequestResourceProviderTest.java | 223 ++++++++++----------- .../server/topology/AsyncCallableServiceTest.java | 29 ++- .../server/topology/ConfigureClusterTaskTest.java | 2 +- .../server/topology/TopologyManagerTest.java | 7 + 25 files changed, 464 insertions(+), 209 deletions(-) diff --git a/ambari-server/src/main/java/org/apache/ambari/server/controller/internal/CalculatedStatus.java b/ambari-server/src/main/java/org/apache/ambari/server/controller/internal/CalculatedStatus.java index c7450b3..1b854d4 100644 --- a/ambari-server/src/main/java/org/apache/ambari/server/controller/internal/CalculatedStatus.java +++ b/ambari-server/src/main/java/org/apache/ambari/server/controller/internal/CalculatedStatus.java @@ -62,7 +62,7 @@ public class CalculatedStatus { private final double percent; /** - * A status which represents a COMPLETED state at 0% + * A status which represents a COMPLETED state at 100% */ public static final CalculatedStatus COMPLETED = new CalculatedStatus(HostRoleStatus.COMPLETED, HostRoleStatus.COMPLETED, 100.0); @@ -73,6 +73,11 @@ public class CalculatedStatus { public static final CalculatedStatus PENDING = new CalculatedStatus(HostRoleStatus.PENDING, HostRoleStatus.PENDING, 0.0); + /** + * A status which represents an ABORTED state at -1% + */ + public static final CalculatedStatus ABORTED = new CalculatedStatus(HostRoleStatus.ABORTED, HostRoleStatus.ABORTED, -1); + // ----- Constructors ------------------------------------------------------ /** diff --git a/ambari-server/src/main/java/org/apache/ambari/server/controller/internal/HostComponentResourceProvider.java b/ambari-server/src/main/java/org/apache/ambari/server/controller/internal/HostComponentResourceProvider.java index cf58325..e1bef2a 100644 --- a/ambari-server/src/main/java/org/apache/ambari/server/controller/internal/HostComponentResourceProvider.java +++ b/ambari-server/src/main/java/org/apache/ambari/server/controller/internal/HostComponentResourceProvider.java @@ -828,31 +828,18 @@ public class HostComponentResourceProvider extends AbstractControllerResourcePro @Override public RequestStageContainer invoke() throws AmbariException { RequestStageContainer stageContainer = null; - int retriesRemaining = 100; - do { - try { - stageContainer = updateHostComponents(stages, requests, request.getRequestInfoProperties(), - runSmokeTest); - } catch (Exception e) { - if (--retriesRemaining == 0) { - LOG.info("Caught an exception while updating host components, will not try again: {}", e.getMessage(), e); - // !!! IllegalArgumentException results in a 400 response, RuntimeException results in 500. - if (IllegalArgumentException.class.isInstance(e)) { - throw (IllegalArgumentException) e; - } else { - throw new RuntimeException("Update Host request submission failed: " + e, e); - } - } else { - LOG.info("Caught an exception while updating host components, retrying : " + e); - try { - Thread.sleep(250); - } catch (InterruptedException ie) { - Thread.currentThread().interrupt(); - throw new RuntimeException("Update Host request submission failed: " + e, e); - } - } + try { + stageContainer = updateHostComponents(stages, requests, request.getRequestInfoProperties(), + runSmokeTest); + } catch (Exception e) { + LOG.info("Caught an exception while updating host components, will not try again: {}", e.getMessage(), e); + // !!! IllegalArgumentException results in a 400 response, RuntimeException results in 500. + if (e instanceof IllegalArgumentException) { + throw (IllegalArgumentException) e; + } else { + throw new RuntimeException("Update Host request submission failed: " + e, e); } - } while (stageContainer == null); + } return stageContainer; } diff --git a/ambari-server/src/main/java/org/apache/ambari/server/controller/internal/RequestResourceProvider.java b/ambari-server/src/main/java/org/apache/ambari/server/controller/internal/RequestResourceProvider.java index eb8334b..3f932e6 100644 --- a/ambari-server/src/main/java/org/apache/ambari/server/controller/internal/RequestResourceProvider.java +++ b/ambari-server/src/main/java/org/apache/ambari/server/controller/internal/RequestResourceProvider.java @@ -73,7 +73,9 @@ import org.apache.commons.lang.StringUtils; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import com.google.common.base.Optional; import com.google.common.collect.ImmutableMap; +import com.google.common.collect.ImmutableSet; import com.google.common.collect.Sets; import com.google.inject.Inject; @@ -132,9 +134,11 @@ public class RequestResourceProvider extends AbstractControllerResourceProvider public static final String HOSTS_PREDICATE = "hosts_predicate"; public static final String ACTION_ID = "action"; public static final String INPUTS_ID = "parameters"; - public static final String EXLUSIVE_ID = "exclusive"; + public static final String EXCLUSIVE_ID = "exclusive"; public static final String HAS_RESOURCE_FILTERS = "HAS_RESOURCE_FILTERS"; + private static final Set<String> PK_PROPERTY_IDS = ImmutableSet.of(REQUEST_ID_PROPERTY_ID); + private PredicateCompiler predicateCompiler = new PredicateCompiler(); /** @@ -425,7 +429,7 @@ public class RequestResourceProvider extends AbstractControllerResourceProvider @Override protected Set<String> getPKPropertyIds() { - return new HashSet<>(Collections.singletonList(REQUEST_ID_PROPERTY_ID)); + return PK_PROPERTY_IDS; } @@ -483,8 +487,8 @@ public class RequestResourceProvider extends AbstractControllerResourceProvider } boolean exclusive = false; - if (requestInfoProperties.containsKey(EXLUSIVE_ID)) { - exclusive = Boolean.valueOf(requestInfoProperties.get(EXLUSIVE_ID).trim()); + if (requestInfoProperties.containsKey(EXCLUSIVE_ID)) { + exclusive = Boolean.valueOf(requestInfoProperties.get(EXCLUSIVE_ID).trim()); } return new ExecuteActionRequest( @@ -756,7 +760,8 @@ public class RequestResourceProvider extends AbstractControllerResourceProvider } setResourceProperty(resource, REQUEST_ID_PROPERTY_ID, entity.getRequestId(), requestedPropertyIds); - setResourceProperty(resource, REQUEST_CONTEXT_ID, entity.getRequestContext(), requestedPropertyIds); + String requestContext = entity.getRequestContext(); + setResourceProperty(resource, REQUEST_CONTEXT_ID, requestContext, requestedPropertyIds); setResourceProperty(resource, REQUEST_TYPE_ID, entity.getRequestType(), requestedPropertyIds); // Mask any sensitive data fields in the inputs data structure @@ -807,15 +812,13 @@ public class RequestResourceProvider extends AbstractControllerResourceProvider final CalculatedStatus status; LogicalRequest logicalRequest = topologyManager.getRequest(entity.getRequestId()); if (summary.isEmpty() && null != logicalRequest) { - // In this case, it appears that there are no tasks but this is a logical - // topology request, so it's a matter of hosts simply not registering yet - // for tasks to be created ==> status = PENDING. - // For a new LogicalRequest there should be at least one HostRequest, - // while if they were removed already ==> status = COMPLETED. - if (logicalRequest.getHostRequests().isEmpty()) { - status = CalculatedStatus.COMPLETED; - } else { - status = CalculatedStatus.PENDING; + status = logicalRequest.calculateStatus(); + if (status == CalculatedStatus.ABORTED) { + Optional<String> failureReason = logicalRequest.getFailureReason(); + if (failureReason.isPresent()) { + requestContext += "\nFAILED: " + failureReason.get(); + setResourceProperty(resource, REQUEST_CONTEXT_ID, requestContext, requestedPropertyIds); + } } } else { // there are either tasks or this is not a logical request, so do normal diff --git a/ambari-server/src/main/java/org/apache/ambari/server/orm/entities/TopologyHostRequestEntity.java b/ambari-server/src/main/java/org/apache/ambari/server/orm/entities/TopologyHostRequestEntity.java index fa8d033..9576fd9 100644 --- a/ambari-server/src/main/java/org/apache/ambari/server/orm/entities/TopologyHostRequestEntity.java +++ b/ambari-server/src/main/java/org/apache/ambari/server/orm/entities/TopologyHostRequestEntity.java @@ -22,6 +22,8 @@ import java.util.Collection; import javax.persistence.CascadeType; import javax.persistence.Column; import javax.persistence.Entity; +import javax.persistence.EnumType; +import javax.persistence.Enumerated; import javax.persistence.Id; import javax.persistence.JoinColumn; import javax.persistence.ManyToOne; @@ -30,6 +32,8 @@ import javax.persistence.NamedQuery; import javax.persistence.OneToMany; import javax.persistence.Table; +import org.apache.ambari.server.actionmanager.HostRoleStatus; + @Entity @Table(name = "topology_host_request") @NamedQueries({ @@ -47,6 +51,13 @@ public class TopologyHostRequestEntity { @Column(name = "host_name", length = 255) private String hostName; + @Column(name = "status") + @Enumerated(EnumType.STRING) + private HostRoleStatus status; + + @Column(name = "status_message") + private String statusMessage; + @ManyToOne @JoinColumn(name = "logical_request_id", referencedColumnName = "id", nullable = false) private TopologyLogicalRequestEntity topologyLogicalRequestEntity; @@ -90,6 +101,22 @@ public class TopologyHostRequestEntity { this.hostName = hostName; } + public HostRoleStatus getStatus() { + return status; + } + + public void setStatus(HostRoleStatus status) { + this.status = status; + } + + public String getStatusMessage() { + return statusMessage; + } + + public void setStatusMessage(String statusMessage) { + this.statusMessage = statusMessage; + } + public TopologyLogicalRequestEntity getTopologyLogicalRequestEntity() { return topologyLogicalRequestEntity; } diff --git a/ambari-server/src/main/java/org/apache/ambari/server/topology/AsyncCallableService.java b/ambari-server/src/main/java/org/apache/ambari/server/topology/AsyncCallableService.java index db57378..ecd2133 100644 --- a/ambari-server/src/main/java/org/apache/ambari/server/topology/AsyncCallableService.java +++ b/ambari-server/src/main/java/org/apache/ambari/server/topology/AsyncCallableService.java @@ -25,11 +25,13 @@ import java.util.concurrent.Future; import java.util.concurrent.ScheduledExecutorService; import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeoutException; +import java.util.function.Consumer; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import com.google.common.base.Preconditions; +import com.google.common.base.Throwables; /** * Callable service implementation for executing tasks asynchronously. @@ -55,12 +57,13 @@ public class AsyncCallableService<T> implements Callable<T> { // the delay between two consecutive execution trials in milliseconds private final long retryDelay; + private final Consumer<Throwable> onError; - public AsyncCallableService(Callable<T> task, long timeout, long retryDelay, String taskName) { - this(task, timeout, retryDelay, taskName, Executors.newScheduledThreadPool(1)); + public AsyncCallableService(Callable<T> task, long timeout, long retryDelay, String taskName, Consumer<Throwable> onError) { + this(task, timeout, retryDelay, taskName, Executors.newScheduledThreadPool(1), onError); } - public AsyncCallableService(Callable<T> task, long timeout, long retryDelay, String taskName, ScheduledExecutorService executorService) { + public AsyncCallableService(Callable<T> task, long timeout, long retryDelay, String taskName, ScheduledExecutorService executorService, Consumer<Throwable> onError) { Preconditions.checkArgument(retryDelay > 0, "retryDelay should be positive"); this.task = task; @@ -68,6 +71,7 @@ public class AsyncCallableService<T> implements Callable<T> { this.timeout = timeout; this.retryDelay = retryDelay; this.taskName = taskName; + this.onError = onError; } @Override @@ -78,6 +82,7 @@ public class AsyncCallableService<T> implements Callable<T> { LOG.info("Task {} execution started at {}", taskName, startTime); while (true) { + Throwable lastError; try { LOG.debug("Task {} waiting for result at most {} ms", taskName, timeLeft); T taskResult = future.get(timeLeft, TimeUnit.MILLISECONDS); @@ -85,18 +90,21 @@ public class AsyncCallableService<T> implements Callable<T> { return taskResult; } catch (TimeoutException e) { LOG.debug("Task {} timeout", taskName); + lastError = e; timeLeft = 0; } catch (ExecutionException e) { - Throwable cause = e.getCause(); + Throwable cause = Throwables.getRootCause(e); if (!(cause instanceof RetryTaskSilently)) { LOG.info(String.format("Task %s exception during execution", taskName), cause); } + lastError = cause; timeLeft = timeout - (System.currentTimeMillis() - startTime); } if (timeLeft < retryDelay) { attemptToCancel(future); LOG.warn("Task {} timeout exceeded, no more retries", taskName); + onError.accept(lastError); return null; } diff --git a/ambari-server/src/main/java/org/apache/ambari/server/topology/HostOfferResponse.java b/ambari-server/src/main/java/org/apache/ambari/server/topology/HostOfferResponse.java index e220c50..2c29e5d 100644 --- a/ambari-server/src/main/java/org/apache/ambari/server/topology/HostOfferResponse.java +++ b/ambari-server/src/main/java/org/apache/ambari/server/topology/HostOfferResponse.java @@ -22,7 +22,8 @@ package org.apache.ambari.server.topology; import java.util.List; import java.util.concurrent.Executor; -import org.apache.ambari.server.topology.tasks.TopologyTask; +import org.apache.ambari.server.actionmanager.HostRoleStatus; +import org.apache.ambari.server.topology.tasks.TopologyHostTask; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -40,9 +41,9 @@ final class HostOfferResponse { private final Answer answer; private final String hostGroupName; private final long hostRequestId; - private final List<TopologyTask> tasks; + private final List<TopologyHostTask> tasks; - static HostOfferResponse createAcceptedResponse(long hostRequestId, String hostGroupName, List<TopologyTask> tasks) { + static HostOfferResponse createAcceptedResponse(long hostRequestId, String hostGroupName, List<TopologyHostTask> tasks) { return new HostOfferResponse(Answer.ACCEPTED, hostRequestId, hostGroupName, tasks); } @@ -50,7 +51,7 @@ final class HostOfferResponse { this(answer, -1, null, null); } - private HostOfferResponse(Answer answer, long hostRequestId, String hostGroupName, List<TopologyTask> tasks) { + private HostOfferResponse(Answer answer, long hostRequestId, String hostGroupName, List<TopologyHostTask> tasks) { this.answer = answer; this.hostRequestId = hostRequestId; this.hostGroupName = hostGroupName; @@ -78,12 +79,20 @@ final class HostOfferResponse { executor.execute(new Runnable() { @Override public void run() { - for (TopologyTask task : tasks) { - LOG.info("Running task for accepted host offer for hostname = {}, task = {}", hostName, task.getType()); - task.run(); + for (TopologyHostTask task : tasks) { + try { + LOG.info("Running task for accepted host offer for hostname = {}, task = {}", hostName, task.getType()); + task.run(); + } catch (Exception e) { + HostRequest hostRequest = task.getHostRequest(); + LOG.error("{} task for host {} failed due to", task.getType(), hostRequest.getHostName(), e); + hostRequest.markHostRequestFailed(HostRoleStatus.ABORTED, e, ambariContext.getPersistedTopologyState()); + break; + } } } }); } } + } diff --git a/ambari-server/src/main/java/org/apache/ambari/server/topology/HostRequest.java b/ambari-server/src/main/java/org/apache/ambari/server/topology/HostRequest.java index 7045912..13f89b5 100644 --- a/ambari-server/src/main/java/org/apache/ambari/server/topology/HostRequest.java +++ b/ambari-server/src/main/java/org/apache/ambari/server/topology/HostRequest.java @@ -29,6 +29,7 @@ import java.util.List; import java.util.Map; import org.apache.ambari.server.actionmanager.HostRoleCommand; +import org.apache.ambari.server.actionmanager.HostRoleStatus; import org.apache.ambari.server.api.predicate.InvalidQueryException; import org.apache.ambari.server.api.predicate.PredicateCompiler; import org.apache.ambari.server.controller.internal.HostResourceProvider; @@ -45,11 +46,14 @@ import org.apache.ambari.server.topology.tasks.InstallHostTask; import org.apache.ambari.server.topology.tasks.PersistHostResourcesTask; import org.apache.ambari.server.topology.tasks.RegisterWithConfigGroupTask; import org.apache.ambari.server.topology.tasks.StartHostTask; +import org.apache.ambari.server.topology.tasks.TopologyHostTask; import org.apache.ambari.server.topology.tasks.TopologyTask; +import org.apache.commons.lang.StringUtils; import org.slf4j.Logger; import org.slf4j.LoggerFactory; - +import com.google.common.base.Optional; +import com.google.common.base.Throwables; /** * Represents a set of requests to a single host such as install, start, etc. @@ -69,6 +73,8 @@ public class HostRequest implements Comparable<HostRequest> { private final long id; private boolean isOutstanding = true; private final boolean skipFailure; + private HostRoleStatus status = HostRoleStatus.PENDING; + private String statusMessage; private Map<TopologyTask, Map<String, Long>> logicalTaskMap = new HashMap<>(); @@ -77,7 +83,7 @@ public class HostRequest implements Comparable<HostRequest> { // logical task id -> physical tasks private Map<Long, Long> physicalTasks = new HashMap<>(); - private List<TopologyTask> topologyTasks = new ArrayList<>(); + private List<TopologyHostTask> topologyTasks = new ArrayList<>(); private ClusterTopology topology; @@ -119,6 +125,8 @@ public class HostRequest implements Comparable<HostRequest> { hostgroupName = entity.getTopologyHostGroupEntity().getName(); hostGroup = topology.getBlueprint().getHostGroup(hostgroupName); hostname = entity.getHostName(); + setStatus(entity.getStatus()); + statusMessage = entity.getStatusMessage(); this.predicate = toPredicate(predicate); containsMaster = hostGroup.containsMasterComponent(); this.topology = topology; @@ -134,6 +142,15 @@ public class HostRequest implements Comparable<HostRequest> { (hostname == null ? "Host Assignment Pending" : hostname)); } + void markHostRequestFailed(HostRoleStatus status, Throwable cause, PersistedState persistedState) { + String errorMessage = StringUtils.substringBefore(Throwables.getRootCause(cause).getMessage(), "\n"); + LOG.info("HostRequest: marking host request {} for {} as {} due to {}", id, hostname, status, errorMessage); + abortPendingTasks(); + setStatus(status); + setStatusMessage(errorMessage); + persistedState.setHostRequestStatus(id, status, errorMessage); + } + //todo: synchronization public synchronized HostOfferResponse offer(Host host) { if (!isOutstanding) { @@ -149,6 +166,24 @@ public class HostRequest implements Comparable<HostRequest> { } } + public HostRoleStatus getStatus() { + return status; + } + + public void setStatus(HostRoleStatus status) { + if (status != null) { + this.status = status; + } + } + + public void setStatusMessage(String errorMessage) { + this.statusMessage = errorMessage; + } + + public Optional<String> getStatusMessage() { + return Optional.fromNullable(statusMessage); + } + public void setHostName(String hostName) { hostname = hostName; } @@ -307,7 +342,7 @@ public class HostRequest implements Comparable<HostRequest> { } } - public List<TopologyTask> getTopologyTasks() { + public List<TopologyHostTask> getTopologyTasks() { return topologyTasks; } @@ -341,6 +376,9 @@ public class HostRequest implements Comparable<HostRequest> { logicalTask.setStructuredOut(physicalTask.getStructuredOut()); } } + if (logicalTask.getStatus() == HostRoleStatus.PENDING && status != HostRoleStatus.PENDING) { + logicalTask.setStatus(status); + } } return logicalTasks.values(); } @@ -440,6 +478,14 @@ public class HostRequest implements Comparable<HostRequest> { getLogicalTask(logicalTaskId).incrementAttemptCount(); } + public void abortPendingTasks() { + for (HostRoleCommand command : getLogicalTasks()) { + if (command.getStatus() == HostRoleStatus.PENDING) { + command.setStatus(HostRoleStatus.ABORTED); + } + } + } + private Predicate toPredicate(String predicate) { Predicate compiledPredicate = null; try { diff --git a/ambari-server/src/main/java/org/apache/ambari/server/topology/LogicalRequest.java b/ambari-server/src/main/java/org/apache/ambari/server/topology/LogicalRequest.java index b63bbad..b9bbe2c 100644 --- a/ambari-server/src/main/java/org/apache/ambari/server/topology/LogicalRequest.java +++ b/ambari-server/src/main/java/org/apache/ambari/server/topology/LogicalRequest.java @@ -38,6 +38,7 @@ import org.apache.ambari.server.controller.AmbariManagementController; import org.apache.ambari.server.controller.AmbariServer; import org.apache.ambari.server.controller.RequestStatusResponse; import org.apache.ambari.server.controller.ShortTaskStatus; +import org.apache.ambari.server.controller.internal.CalculatedStatus; import org.apache.ambari.server.orm.dao.HostRoleCommandStatusSummaryDTO; import org.apache.ambari.server.orm.entities.StageEntity; import org.apache.ambari.server.orm.entities.TopologyHostGroupEntity; @@ -49,6 +50,7 @@ import org.apache.commons.lang.StringUtils; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import com.google.common.base.Optional; import com.google.common.collect.Iterables; @@ -405,6 +407,40 @@ public class LogicalRequest extends Request { return removed; } + /** + * @return true if all the tasks in the logical request are in completed state, false otherwise + */ + public boolean isFinished() { + for (ShortTaskStatus ts : getRequestStatus().getTasks()) { + if (!HostRoleStatus.valueOf(ts.getStatus()).isCompletedState()) { + return false; + } + } + return true; + } + + /** + * Returns if all the tasks in the logical request have completed state. + */ + public boolean isSuccessful() { + for (ShortTaskStatus ts : getRequestStatus().getTasks()) { + if (HostRoleStatus.valueOf(ts.getStatus()) != HostRoleStatus.COMPLETED) { + return false; + } + } + return true; + } + + public Optional<String> getFailureReason() { + for (HostRequest request : getHostRequests()) { + Optional<String> failureReason = request.getStatusMessage(); + if (failureReason.isPresent()) { + return failureReason; + } + } + return Optional.absent(); + } + private void createHostRequests(TopologyRequest request, ClusterTopology topology) { Map<String, HostGroupInfo> hostGroupInfoMap = request.getHostGroupInfo(); Blueprint blueprint = topology.getBlueprint(); @@ -523,4 +559,12 @@ public class LogicalRequest extends Request { } return controller; } + + public CalculatedStatus calculateStatus() { + return !isFinished() + ? CalculatedStatus.PENDING + : isSuccessful() + ? CalculatedStatus.COMPLETED + : CalculatedStatus.ABORTED; + } } diff --git a/ambari-server/src/main/java/org/apache/ambari/server/topology/PersistedState.java b/ambari-server/src/main/java/org/apache/ambari/server/topology/PersistedState.java index 369bf52..a0b96e6 100644 --- a/ambari-server/src/main/java/org/apache/ambari/server/topology/PersistedState.java +++ b/ambari-server/src/main/java/org/apache/ambari/server/topology/PersistedState.java @@ -22,6 +22,7 @@ import java.util.Collection; import java.util.List; import java.util.Map; +import org.apache.ambari.server.actionmanager.HostRoleStatus; import org.apache.ambari.server.controller.internal.BaseClusterRequest; import org.apache.ambari.server.state.Host; @@ -86,4 +87,8 @@ public interface PersistedState { */ void removeHostRequests(long logicalRequestId, Collection<HostRequest> hostRequests); + /** + * Update the status of the given host request. + */ + void setHostRequestStatus(long hostRequestId, HostRoleStatus status, String message); } diff --git a/ambari-server/src/main/java/org/apache/ambari/server/topology/PersistedStateImpl.java b/ambari-server/src/main/java/org/apache/ambari/server/topology/PersistedStateImpl.java index 63898ba..bb1f852 100644 --- a/ambari-server/src/main/java/org/apache/ambari/server/topology/PersistedStateImpl.java +++ b/ambari-server/src/main/java/org/apache/ambari/server/topology/PersistedStateImpl.java @@ -28,6 +28,7 @@ import javax.inject.Singleton; import org.apache.ambari.server.AmbariException; import org.apache.ambari.server.actionmanager.HostRoleCommand; +import org.apache.ambari.server.actionmanager.HostRoleStatus; import org.apache.ambari.server.api.predicate.InvalidQueryException; import org.apache.ambari.server.controller.internal.BaseClusterRequest; import org.apache.ambari.server.orm.dao.HostDAO; @@ -142,6 +143,16 @@ public class PersistedStateImpl implements PersistedState { } @Override + public void setHostRequestStatus(long hostRequestId, HostRoleStatus status, String message) { + TopologyHostRequestEntity hostRequestEntity = hostRequestDAO.findById(hostRequestId); + if (hostRequestEntity != null) { + hostRequestEntity.setStatus(status); + hostRequestEntity.setStatusMessage(message); + hostRequestDAO.merge(hostRequestEntity); + } + } + + @Override public void registerPhysicalTask(long logicalTaskId, long physicalTaskId) { TopologyLogicalTaskEntity entity = topologyLogicalTaskDAO.findById(logicalTaskId); HostRoleCommandEntity physicalEntity = hostRoleCommandDAO.findByPK(physicalTaskId); diff --git a/ambari-server/src/main/java/org/apache/ambari/server/topology/TopologyManager.java b/ambari-server/src/main/java/org/apache/ambari/server/topology/TopologyManager.java index 6bdc896..6da7671 100644 --- a/ambari-server/src/main/java/org/apache/ambari/server/topology/TopologyManager.java +++ b/ambari-server/src/main/java/org/apache/ambari/server/topology/TopologyManager.java @@ -33,6 +33,9 @@ import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; import java.util.concurrent.LinkedBlockingQueue; import java.util.concurrent.TimeUnit; +import java.util.concurrent.TimeoutException; + +import javax.inject.Inject; import org.apache.ambari.server.AmbariException; import org.apache.ambari.server.actionmanager.HostRoleCommand; @@ -41,7 +44,6 @@ import org.apache.ambari.server.api.services.stackadvisor.StackAdvisorBlueprintP import org.apache.ambari.server.configuration.Configuration; import org.apache.ambari.server.controller.AmbariServer; import org.apache.ambari.server.controller.RequestStatusResponse; -import org.apache.ambari.server.controller.ShortTaskStatus; import org.apache.ambari.server.controller.internal.ArtifactResourceProvider; import org.apache.ambari.server.controller.internal.BaseClusterRequest; import org.apache.ambari.server.controller.internal.CalculatedStatus; @@ -80,7 +82,6 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; import com.google.common.eventbus.Subscribe; -import com.google.inject.Inject; import com.google.inject.Singleton; import com.google.inject.persist.Transactional; @@ -337,7 +338,7 @@ public class TopologyManager { clusterTopologyMap.put(clusterId, topology); - addClusterConfigRequest(topology, new ClusterConfigurationRequest(ambariContext, topology, true, + addClusterConfigRequest(logicalRequest, topology, new ClusterConfigurationRequest(ambariContext, topology, true, stackAdvisorBlueprintProcessor, securityType == SecurityType.KERBEROS)); // Process the logical request @@ -1048,9 +1049,17 @@ public class TopologyManager { if (!configChecked) { configChecked = true; if (!ambariContext.isTopologyResolved(topology.getClusterId())) { - LOG.info("TopologyManager.replayRequests: no config with TOPOLOGY_RESOLVED found, adding cluster config request"); - addClusterConfigRequest(topology, new ClusterConfigurationRequest( - ambariContext, topology, false, stackAdvisorBlueprintProcessor)); + if (provisionRequest == null) { + LOG.info("TopologyManager.replayRequests: no config with TOPOLOGY_RESOLVED found, but provision request missing, skipping cluster config request"); + } else if (provisionRequest.isFinished()) { + LOG.info("TopologyManager.replayRequests: no config with TOPOLOGY_RESOLVED found, but provision request is finished, skipping cluster config request"); + } else { + LOG.info("TopologyManager.replayRequests: no config with TOPOLOGY_RESOLVED found, adding cluster config request"); + ClusterConfigurationRequest configRequest = new ClusterConfigurationRequest(ambariContext, topology, false, stackAdvisorBlueprintProcessor); + addClusterConfigRequest(provisionRequest, topology, configRequest); + } + } else { + getOrCreateTopologyTaskExecutor(topology.getClusterId()).start(); } } } @@ -1058,36 +1067,17 @@ public class TopologyManager { } /** - * @param logicalRequest * @return true if all the tasks in the logical request are in completed state, false otherwise */ private boolean isLogicalRequestFinished(LogicalRequest logicalRequest) { - if(logicalRequest != null) { - boolean completed = true; - for(ShortTaskStatus ts : logicalRequest.getRequestStatus().getTasks()) { - if(!HostRoleStatus.valueOf(ts.getStatus()).isCompletedState()) { - completed = false; - } - } - return completed; - } - return false; + return logicalRequest != null && logicalRequest.isFinished(); } /** * Returns if all the tasks in the logical request have completed state. - * @param logicalRequest - * @return */ private boolean isLogicalRequestSuccessful(LogicalRequest logicalRequest) { - if(logicalRequest != null) { - for(ShortTaskStatus ts : logicalRequest.getRequestStatus().getTasks()) { - if(HostRoleStatus.valueOf(ts.getStatus()) != HostRoleStatus.COMPLETED) { - return false; - } - } - } - return true; + return logicalRequest != null && logicalRequest.isSuccessful(); } //todo: this should invoke a callback on each 'service' in the topology @@ -1118,9 +1108,15 @@ public class TopologyManager { * @param topology cluster topology * @param configurationRequest configuration request to be executed */ - private void addClusterConfigRequest(ClusterTopology topology, ClusterConfigurationRequest configurationRequest) { + private void addClusterConfigRequest(final LogicalRequest logicalRequest, ClusterTopology topology, ClusterConfigurationRequest configurationRequest) { ConfigureClusterTask task = configureClusterTaskFactory.createConfigureClusterTask(topology, configurationRequest, ambariEventPublisher); - executor.submit(new AsyncCallableService<>(task, task.getTimeout(), task.getRepeatDelay(),"ConfigureClusterTask")); + executor.submit(new AsyncCallableService<>(task, task.getTimeout(), task.getRepeatDelay(),"ConfigureClusterTask", throwable -> { + HostRoleStatus status = throwable instanceof TimeoutException ? HostRoleStatus.TIMEDOUT : HostRoleStatus.FAILED; + LOG.info("ConfigureClusterTask failed, marking host requests {}", status); + for (HostRequest hostRequest : logicalRequest.getHostRequests()) { + hostRequest.markHostRequestFailed(status, throwable, persistedState); + } + })); } /** diff --git a/ambari-server/src/main/java/org/apache/ambari/server/topology/tasks/TopologyHostTask.java b/ambari-server/src/main/java/org/apache/ambari/server/topology/tasks/TopologyHostTask.java index e016ec8..c1fc99a 100644 --- a/ambari-server/src/main/java/org/apache/ambari/server/topology/tasks/TopologyHostTask.java +++ b/ambari-server/src/main/java/org/apache/ambari/server/topology/tasks/TopologyHostTask.java @@ -39,6 +39,10 @@ public abstract class TopologyHostTask implements TopologyTask { this.hostRequest = hostRequest; } + public HostRequest getHostRequest() { + return hostRequest; + } + /** * Run with an InternalAuthenticationToken as when running these tasks we might not have any active security context. */ diff --git a/ambari-server/src/main/java/org/apache/ambari/server/topology/tasks/TopologyTask.java b/ambari-server/src/main/java/org/apache/ambari/server/topology/tasks/TopologyTask.java index bb20349..f0cbffa 100644 --- a/ambari-server/src/main/java/org/apache/ambari/server/topology/tasks/TopologyTask.java +++ b/ambari-server/src/main/java/org/apache/ambari/server/topology/tasks/TopologyTask.java @@ -19,6 +19,12 @@ package org.apache.ambari.server.topology.tasks; +import java.util.Set; + +import org.apache.ambari.server.RoleCommand; + +import com.google.common.collect.ImmutableSet; + /** * Task which is executed by the TopologyManager. */ @@ -30,7 +36,19 @@ public interface TopologyTask extends Runnable { RESOURCE_CREATION, CONFIGURE, INSTALL, - START + START { + @Override + public Set<RoleCommand> tasksToAbortOnFailure() { + return ImmutableSet.of(RoleCommand.START); + } + }, + ; + + private static Set<RoleCommand> ALL_TASKS = ImmutableSet.of(RoleCommand.INSTALL, RoleCommand.START); + + public Set<RoleCommand> tasksToAbortOnFailure() { + return ALL_TASKS; + } } /** diff --git a/ambari-server/src/main/java/org/apache/ambari/server/upgrade/SchemaUpgradeHelper.java b/ambari-server/src/main/java/org/apache/ambari/server/upgrade/SchemaUpgradeHelper.java index 89fd1fb..1523506 100644 --- a/ambari-server/src/main/java/org/apache/ambari/server/upgrade/SchemaUpgradeHelper.java +++ b/ambari-server/src/main/java/org/apache/ambari/server/upgrade/SchemaUpgradeHelper.java @@ -186,6 +186,7 @@ public class SchemaUpgradeHelper { catalogBinder.addBinding().to(UpgradeCatalog252.class); catalogBinder.addBinding().to(UpgradeCatalog260.class); catalogBinder.addBinding().to(UpgradeCatalog261.class); + catalogBinder.addBinding().to(UpgradeCatalog262.class); catalogBinder.addBinding().to(UpgradeCatalog300.class); catalogBinder.addBinding().to(UpdateAlertScriptPaths.class); catalogBinder.addBinding().to(FinalUpgradeCatalog.class); diff --git a/ambari-server/src/main/java/org/apache/ambari/server/upgrade/UpgradeCatalog262.java b/ambari-server/src/main/java/org/apache/ambari/server/upgrade/UpgradeCatalog262.java new file mode 100644 index 0000000..f83204d --- /dev/null +++ b/ambari-server/src/main/java/org/apache/ambari/server/upgrade/UpgradeCatalog262.java @@ -0,0 +1,70 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.ambari.server.upgrade; + +import java.sql.SQLException; + +import org.apache.ambari.server.AmbariException; +import org.apache.ambari.server.orm.DBAccessor; + +import com.google.inject.Inject; +import com.google.inject.Injector; + +/** + * The {@link UpgradeCatalog262} upgrades Ambari from 2.6.1 to 2.6.2. + */ +public class UpgradeCatalog262 extends AbstractUpgradeCatalog { + + private static final String HOST_REQUEST_TABLE = "topology_host_request"; + private static final String STATUS_COLUMN = "status"; + private static final String STATUS_MESSAGE_COLUMN = "status_message"; + + @Inject + public UpgradeCatalog262(Injector injector) { + super(injector); + } + + @Override + public String getSourceVersion() { + return "2.6.1"; + } + + @Override + public String getTargetVersion() { + return "2.6.2"; + } + + @Override + protected void executeDDLUpdates() throws AmbariException, SQLException { + addHostRequestStatusColumn(); + } + + private void addHostRequestStatusColumn() throws SQLException { + dbAccessor.addColumn(HOST_REQUEST_TABLE, new DBAccessor.DBColumnInfo(STATUS_COLUMN, String.class, 255, null, true)); + dbAccessor.addColumn(HOST_REQUEST_TABLE, new DBAccessor.DBColumnInfo(STATUS_MESSAGE_COLUMN, String.class, 1024, null, true)); + } + + @Override + protected void executePreDMLUpdates() throws AmbariException, SQLException { + } + + @Override + protected void executeDMLUpdates() throws AmbariException, SQLException { + } + +} diff --git a/ambari-server/src/main/resources/Ambari-DDL-Derby-CREATE.sql b/ambari-server/src/main/resources/Ambari-DDL-Derby-CREATE.sql index fb432b9..0e940ae 100644 --- a/ambari-server/src/main/resources/Ambari-DDL-Derby-CREATE.sql +++ b/ambari-server/src/main/resources/Ambari-DDL-Derby-CREATE.sql @@ -774,6 +774,8 @@ CREATE TABLE topology_host_request ( group_id BIGINT NOT NULL, stage_id BIGINT NOT NULL, host_name VARCHAR(255), + status VARCHAR(255), + status_message VARCHAR(1024), CONSTRAINT PK_topology_host_request PRIMARY KEY (id), CONSTRAINT FK_hostreq_group_id FOREIGN KEY (group_id) REFERENCES topology_hostgroup(id), CONSTRAINT FK_hostreq_logicalreq_id FOREIGN KEY (logical_request_id) REFERENCES topology_logical_request(id)); diff --git a/ambari-server/src/main/resources/Ambari-DDL-MySQL-CREATE.sql b/ambari-server/src/main/resources/Ambari-DDL-MySQL-CREATE.sql index 53a2872..da14dcb 100644 --- a/ambari-server/src/main/resources/Ambari-DDL-MySQL-CREATE.sql +++ b/ambari-server/src/main/resources/Ambari-DDL-MySQL-CREATE.sql @@ -792,6 +792,8 @@ CREATE TABLE topology_host_request ( group_id BIGINT NOT NULL, stage_id BIGINT NOT NULL, host_name VARCHAR(255), + status VARCHAR(255), + status_message VARCHAR(1024), CONSTRAINT PK_topology_host_request PRIMARY KEY (id), CONSTRAINT FK_hostreq_group_id FOREIGN KEY (group_id) REFERENCES topology_hostgroup(id), CONSTRAINT FK_hostreq_logicalreq_id FOREIGN KEY (logical_request_id) REFERENCES topology_logical_request(id)); diff --git a/ambari-server/src/main/resources/Ambari-DDL-Oracle-CREATE.sql b/ambari-server/src/main/resources/Ambari-DDL-Oracle-CREATE.sql index 326cb4b..d5cf42a 100644 --- a/ambari-server/src/main/resources/Ambari-DDL-Oracle-CREATE.sql +++ b/ambari-server/src/main/resources/Ambari-DDL-Oracle-CREATE.sql @@ -771,6 +771,8 @@ CREATE TABLE topology_host_request ( group_id NUMBER(19) NOT NULL, stage_id NUMBER(19) NOT NULL, host_name VARCHAR(255), + status VARCHAR2(255), + status_message VARCHAR2(1024), CONSTRAINT PK_topology_host_request PRIMARY KEY (id), CONSTRAINT FK_hostreq_group_id FOREIGN KEY (group_id) REFERENCES topology_hostgroup(id), CONSTRAINT FK_hostreq_logicalreq_id FOREIGN KEY (logical_request_id) REFERENCES topology_logical_request(id)); diff --git a/ambari-server/src/main/resources/Ambari-DDL-Postgres-CREATE.sql b/ambari-server/src/main/resources/Ambari-DDL-Postgres-CREATE.sql index f4e0757..f2b3da3 100644 --- a/ambari-server/src/main/resources/Ambari-DDL-Postgres-CREATE.sql +++ b/ambari-server/src/main/resources/Ambari-DDL-Postgres-CREATE.sql @@ -775,6 +775,8 @@ CREATE TABLE topology_host_request ( group_id BIGINT NOT NULL, stage_id BIGINT NOT NULL, host_name VARCHAR(255), + status VARCHAR(255), + status_message VARCHAR(1024), CONSTRAINT PK_topology_host_request PRIMARY KEY (id), CONSTRAINT FK_hostreq_group_id FOREIGN KEY (group_id) REFERENCES topology_hostgroup(id), CONSTRAINT FK_hostreq_logicalreq_id FOREIGN KEY (logical_request_id) REFERENCES topology_logical_request(id)); diff --git a/ambari-server/src/main/resources/Ambari-DDL-SQLAnywhere-CREATE.sql b/ambari-server/src/main/resources/Ambari-DDL-SQLAnywhere-CREATE.sql index 3bfa60f..a62a7d2 100644 --- a/ambari-server/src/main/resources/Ambari-DDL-SQLAnywhere-CREATE.sql +++ b/ambari-server/src/main/resources/Ambari-DDL-SQLAnywhere-CREATE.sql @@ -769,6 +769,8 @@ CREATE TABLE topology_host_request ( group_id NUMERIC(19) NOT NULL, stage_id NUMERIC(19) NOT NULL, host_name VARCHAR(255), + status VARCHAR(255), + status_message VARCHAR(1024), CONSTRAINT PK_topology_host_request PRIMARY KEY (id), CONSTRAINT FK_hostreq_group_id FOREIGN KEY (group_id) REFERENCES topology_hostgroup(id), CONSTRAINT FK_hostreq_logicalreq_id FOREIGN KEY (logical_request_id) REFERENCES topology_logical_request(id)); diff --git a/ambari-server/src/main/resources/Ambari-DDL-SQLServer-CREATE.sql b/ambari-server/src/main/resources/Ambari-DDL-SQLServer-CREATE.sql index 0d7d885..d84cd70 100644 --- a/ambari-server/src/main/resources/Ambari-DDL-SQLServer-CREATE.sql +++ b/ambari-server/src/main/resources/Ambari-DDL-SQLServer-CREATE.sql @@ -791,6 +791,8 @@ CREATE TABLE topology_host_request ( group_id BIGINT NOT NULL, stage_id BIGINT NOT NULL, host_name VARCHAR(255), + status VARCHAR(255), + status_message VARCHAR(1024), CONSTRAINT PK_topology_host_request PRIMARY KEY CLUSTERED (id), CONSTRAINT FK_hostreq_group_id FOREIGN KEY (group_id) REFERENCES topology_hostgroup(id), CONSTRAINT FK_hostreq_logicalreq_id FOREIGN KEY (logical_request_id) REFERENCES topology_logical_request(id)); diff --git a/ambari-server/src/test/java/org/apache/ambari/server/controller/internal/RequestResourceProviderTest.java b/ambari-server/src/test/java/org/apache/ambari/server/controller/internal/RequestResourceProviderTest.java index b3f2c3c..d8cb701 100644 --- a/ambari-server/src/test/java/org/apache/ambari/server/controller/internal/RequestResourceProviderTest.java +++ b/ambari-server/src/test/java/org/apache/ambari/server/controller/internal/RequestResourceProviderTest.java @@ -28,11 +28,13 @@ import static org.easymock.EasyMock.newCapture; import static org.powermock.api.easymock.PowerMock.createMock; import static org.powermock.api.easymock.PowerMock.createNiceMock; import static org.powermock.api.easymock.PowerMock.replay; +import static org.powermock.api.easymock.PowerMock.replayAll; import static org.powermock.api.easymock.PowerMock.reset; +import static org.powermock.api.easymock.PowerMock.resetAll; import static org.powermock.api.easymock.PowerMock.verify; +import static org.powermock.api.easymock.PowerMock.verifyAll; import java.lang.reflect.Field; -import java.util.ArrayList; import java.util.Arrays; import java.util.Collection; import java.util.Collections; @@ -99,7 +101,11 @@ import org.powermock.modules.junit4.PowerMockRunner; import org.springframework.security.core.Authentication; import org.springframework.security.core.context.SecurityContextHolder; +import com.google.common.base.Optional; +import com.google.common.collect.ImmutableList; +import com.google.common.collect.ImmutableMap; import com.google.common.collect.ImmutableSet; +import com.google.common.collect.Iterables; import com.google.common.collect.Lists; import com.google.gson.Gson; import com.google.gson.reflect.TypeToken; @@ -1159,9 +1165,7 @@ public class RequestResourceProviderTest { capture(requestCapture), capture(predicateCapture))).andReturn(Collections.singleton(resource)); // replay - replay(managementController, response, controller, - hostComponentProcessResourceProvider, resource, clusters); - PowerMock.replayAll(); + replayAll(); SecurityContextHolder.getContext().setAuthentication( TestAuthenticationFactory.createAdministrator()); @@ -1626,125 +1630,106 @@ public class RequestResourceProviderTest { } /** - * Tests that topology requests return different status (PENDING) if there are - * no tasks. Normal requests should return COMPLETED. - * - * @throws Exception + * Tests that if there are no tasks, topology requests return status they get from the logical request. */ @Test @PrepareForTest(AmbariServer.class) public void testGetLogicalRequestStatusWithNoTasks() throws Exception { - // Given - Resource.Type type = Resource.Type.Request; - - AmbariManagementController managementController = createMock(AmbariManagementController.class); - ActionManager actionManager = createNiceMock(ActionManager.class); - - Clusters clusters = createNiceMock(Clusters.class); - - RequestEntity requestMock = createNiceMock(RequestEntity.class); - - expect(requestMock.getRequestContext()).andReturn("this is a context").anyTimes(); - expect(requestMock.getRequestId()).andReturn(100L).anyTimes(); - Capture<Collection<Long>> requestIdsCapture = Capture.newInstance(); - - - ClusterTopology topology = createNiceMock(ClusterTopology.class); - - HostGroup hostGroup = createNiceMock(HostGroup.class); - expect(hostGroup.getName()).andReturn("host_group_1").anyTimes(); - - Blueprint blueprint = createNiceMock(Blueprint.class); - expect(blueprint.getHostGroup("host_group_1")).andReturn(hostGroup).anyTimes(); - expect(topology.getClusterId()).andReturn(2L).anyTimes(); - - Long clusterId = 2L; - String clusterName = "cluster1"; - Cluster cluster = createNiceMock(Cluster.class); - expect(cluster.getClusterId()).andReturn(clusterId).anyTimes(); - expect(cluster.getClusterName()).andReturn(clusterName).anyTimes(); - - expect(managementController.getActionManager()).andReturn(actionManager).anyTimes(); - expect(managementController.getClusters()).andReturn(clusters).anyTimes(); - expect(clusters.getCluster(eq(clusterName))).andReturn(cluster).anyTimes(); - expect(clusters.getClusterById(clusterId)).andReturn(cluster).anyTimes(); - expect(requestDAO.findByPks(capture(requestIdsCapture), eq(true))).andReturn(Lists.newArrayList(requestMock)); - expect(hrcDAO.findAggregateCounts((Long) anyObject())).andReturn( - Collections.emptyMap()).anyTimes(); - - Map<String, HostGroupInfo> hostGroupInfoMap = new HashMap<>(); - HostGroupInfo hostGroupInfo = new HostGroupInfo("host_group_1"); - hostGroupInfo.setRequestedCount(1); - hostGroupInfoMap.put("host_group_1", hostGroupInfo); - - TopologyRequest topologyRequest = createNiceMock(TopologyRequest.class); - expect(topologyRequest.getHostGroupInfo()).andReturn(hostGroupInfoMap).anyTimes(); - expect(topology.getBlueprint()).andReturn(blueprint).anyTimes(); - expect(blueprint.shouldSkipFailure()).andReturn(true).anyTimes(); - - - - PowerMock.mockStatic(AmbariServer.class); - expect(AmbariServer.getController()).andReturn(managementController).anyTimes(); - - PowerMock.replayAll( - topologyRequest, - topology, - blueprint, - managementController, - clusters); - - - LogicalRequest logicalRequest = createNiceMock(LogicalRequest.class); - Collection<HostRequest> hostRequests = new ArrayList<>(); - HostRequest hostRequest = createNiceMock(HostRequest.class); - hostRequests.add(hostRequest); - expect(logicalRequest.getHostRequests()).andReturn(hostRequests).anyTimes(); - expect(logicalRequest.constructNewPersistenceEntity()).andReturn(requestMock).anyTimes(); - - reset(topologyManager); - - expect(topologyManager.getRequest(100L)).andReturn(logicalRequest).anyTimes(); - - - expect(topologyManager.getRequests(eq(Collections.singletonList(100L)))).andReturn( - Collections.singletonList(logicalRequest)).anyTimes(); - expect(topologyManager.getStageSummaries(EasyMock.<Long>anyObject())).andReturn( - Collections.emptyMap()).anyTimes(); - - replay(actionManager, requestMock, requestDAO, hrcDAO, topologyManager, logicalRequest, hostRequest); - - ResourceProvider provider = AbstractControllerResourceProvider.getResourceProvider( - type, - managementController); - - Set<String> propertyIds = ImmutableSet.of( - RequestResourceProvider.REQUEST_ID_PROPERTY_ID, - RequestResourceProvider.REQUEST_STATUS_PROPERTY_ID, - RequestResourceProvider.REQUEST_PROGRESS_PERCENT_ID - ); - - Predicate predicate = new PredicateBuilder(). - property(RequestResourceProvider.REQUEST_ID_PROPERTY_ID).equals("100"). - toPredicate(); - - Request request = PropertyHelper.getReadRequest(propertyIds); - - // When - Set<Resource> resources = provider.getResources(request, predicate); - - // Then - - - // verify - PowerMock.verifyAll(); - verify(actionManager, requestMock, requestDAO, hrcDAO, topologyManager, logicalRequest, hostRequest); - - Assert.assertEquals(1, resources.size()); - for (Resource resource : resources) { - Assert.assertEquals(100L, (long)(Long) resource.getPropertyValue(RequestResourceProvider.REQUEST_ID_PROPERTY_ID)); - Assert.assertEquals("PENDING", resource.getPropertyValue(RequestResourceProvider.REQUEST_STATUS_PROPERTY_ID)); - Assert.assertEquals(0.0, resource.getPropertyValue(RequestResourceProvider.REQUEST_PROGRESS_PERCENT_ID)); + Iterable<CalculatedStatus> statusList = ImmutableList.of(CalculatedStatus.COMPLETED, CalculatedStatus.PENDING, CalculatedStatus.ABORTED); + for (CalculatedStatus calculatedStatus : statusList) { + // Given + resetAll(); + + PowerMock.mockStatic(AmbariServer.class); + AmbariManagementController managementController = createMock(AmbariManagementController.class); + ActionManager actionManager = createNiceMock(ActionManager.class); + Clusters clusters = createNiceMock(Clusters.class); + Cluster cluster = createNiceMock(Cluster.class); + RequestEntity requestMock = createNiceMock(RequestEntity.class); + Blueprint blueprint = createNiceMock(Blueprint.class); + ClusterTopology topology = createNiceMock(ClusterTopology.class); + HostGroup hostGroup = createNiceMock(HostGroup.class); + TopologyRequest topologyRequest = createNiceMock(TopologyRequest.class); + LogicalRequest logicalRequest = createNiceMock(LogicalRequest.class); + HostRequest hostRequest = createNiceMock(HostRequest.class); + + Long requestId = 100L; + Long clusterId = 2L; + String clusterName = "cluster1"; + String hostGroupName = "host_group_1"; + HostGroupInfo hostGroupInfo = new HostGroupInfo(hostGroupName); + hostGroupInfo.setRequestedCount(1); + Map<String, HostGroupInfo> hostGroupInfoMap = ImmutableMap.of(hostGroupName, hostGroupInfo); + Collection<HostRequest> hostRequests = Collections.singletonList(hostRequest); + Map<Long, HostRoleCommandStatusSummaryDTO> dtoMap = Collections.emptyMap(); + + expect(AmbariServer.getController()).andReturn(managementController).anyTimes(); + expect(requestMock.getRequestContext()).andReturn("this is a context").anyTimes(); + expect(requestMock.getRequestId()).andReturn(requestId).anyTimes(); + expect(hostGroup.getName()).andReturn(hostGroupName).anyTimes(); + expect(blueprint.getHostGroup(hostGroupName)).andReturn(hostGroup).anyTimes(); + expect(topology.getClusterId()).andReturn(2L).anyTimes(); + expect(cluster.getClusterId()).andReturn(clusterId).anyTimes(); + expect(cluster.getClusterName()).andReturn(clusterName).anyTimes(); + expect(managementController.getActionManager()).andReturn(actionManager).anyTimes(); + expect(managementController.getClusters()).andReturn(clusters).anyTimes(); + expect(clusters.getCluster(eq(clusterName))).andReturn(cluster).anyTimes(); + expect(clusters.getClusterById(clusterId)).andReturn(cluster).anyTimes(); + Collection<Long> requestIds = anyObject(); + expect(requestDAO.findByPks(requestIds, eq(true))).andReturn(Lists.newArrayList(requestMock)); + expect(hrcDAO.findAggregateCounts((Long) anyObject())).andReturn(dtoMap).anyTimes(); + expect(topologyManager.getRequest(requestId)).andReturn(logicalRequest).anyTimes(); + expect(topologyManager.getRequests(eq(Collections.singletonList(requestId)))).andReturn(Collections.singletonList(logicalRequest)).anyTimes(); + expect(topologyManager.getStageSummaries(EasyMock.<Long>anyObject())).andReturn(dtoMap).anyTimes(); + + expect(topologyRequest.getHostGroupInfo()).andReturn(hostGroupInfoMap).anyTimes(); + expect(topology.getBlueprint()).andReturn(blueprint).anyTimes(); + expect(blueprint.shouldSkipFailure()).andReturn(true).anyTimes(); + + expect(logicalRequest.getHostRequests()).andReturn(hostRequests).anyTimes(); + expect(logicalRequest.constructNewPersistenceEntity()).andReturn(requestMock).anyTimes(); + expect(logicalRequest.calculateStatus()).andReturn(calculatedStatus).anyTimes(); + Optional<String> failureReason = calculatedStatus == CalculatedStatus.ABORTED + ? Optional.of("some reason") + : Optional.<String>absent(); + expect(logicalRequest.getFailureReason()).andReturn(failureReason).anyTimes(); + + replayAll(); + + Resource.Type type = Resource.Type.Request; + ResourceProvider provider = AbstractControllerResourceProvider.getResourceProvider( + type, + managementController + ); + + Set<String> propertyIds = ImmutableSet.of( + RequestResourceProvider.REQUEST_ID_PROPERTY_ID, + RequestResourceProvider.REQUEST_STATUS_PROPERTY_ID, + RequestResourceProvider.REQUEST_PROGRESS_PERCENT_ID, + RequestResourceProvider.REQUEST_CONTEXT_ID + ); + + Predicate predicate = new PredicateBuilder(). + property(RequestResourceProvider.REQUEST_ID_PROPERTY_ID).equals("100"). + toPredicate(); + + Request request = PropertyHelper.getReadRequest(propertyIds); + + // When + Set<Resource> resources = provider.getResources(request, predicate); + + // Then + verifyAll(); + + Assert.assertEquals(1, resources.size()); + Resource resource = Iterables.getOnlyElement(resources); + Assert.assertEquals(requestId, resource.getPropertyValue(RequestResourceProvider.REQUEST_ID_PROPERTY_ID)); + Assert.assertEquals(calculatedStatus.getStatus().toString(), resource.getPropertyValue(RequestResourceProvider.REQUEST_STATUS_PROPERTY_ID)); + Assert.assertEquals(calculatedStatus.getPercent(), resource.getPropertyValue(RequestResourceProvider.REQUEST_PROGRESS_PERCENT_ID)); + + Object requestContext = resource.getPropertyValue(RequestResourceProvider.REQUEST_CONTEXT_ID); + Assert.assertNotNull(requestContext); + Assert.assertTrue(!failureReason.isPresent() || requestContext.toString().contains(failureReason.get())); } } } diff --git a/ambari-server/src/test/java/org/apache/ambari/server/topology/AsyncCallableServiceTest.java b/ambari-server/src/test/java/org/apache/ambari/server/topology/AsyncCallableServiceTest.java index edc0954..6fdb798 100644 --- a/ambari-server/src/test/java/org/apache/ambari/server/topology/AsyncCallableServiceTest.java +++ b/ambari-server/src/test/java/org/apache/ambari/server/topology/AsyncCallableServiceTest.java @@ -18,13 +18,16 @@ package org.apache.ambari.server.topology; +import static org.easymock.EasyMock.anyObject; import static org.easymock.EasyMock.expect; +import static org.easymock.EasyMock.expectLastCall; import java.util.concurrent.Callable; import java.util.concurrent.ScheduledExecutorService; import java.util.concurrent.ScheduledFuture; import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeoutException; +import java.util.function.Consumer; import org.easymock.EasyMockRule; import org.easymock.EasyMockSupport; @@ -51,19 +54,24 @@ public class AsyncCallableServiceTest extends EasyMockSupport { @Mock private ScheduledFuture<Boolean> futureMock; + @Mock + private Consumer<Throwable> onErrorMock; + private AsyncCallableService<Boolean> asyncCallableService; @Test public void testCallableServiceShouldCancelTaskWhenTimeoutExceeded() throws Exception { // GIVEN long timeout = -1; // guaranteed timeout - expect(futureMock.get(timeout, TimeUnit.MILLISECONDS)).andThrow(new TimeoutException("Testing the timeout exceeded case")); + TimeoutException timeoutException = new TimeoutException("Testing the timeout exceeded case"); + expect(futureMock.get(timeout, TimeUnit.MILLISECONDS)).andThrow(timeoutException); expect(futureMock.isDone()).andReturn(Boolean.FALSE); expect(futureMock.cancel(true)).andReturn(Boolean.TRUE); expect(executorServiceMock.submit(taskMock)).andReturn(futureMock); + onErrorMock.accept(timeoutException); replayAll(); - asyncCallableService = new AsyncCallableService<>(taskMock, timeout, RETRY_DELAY, "test", executorServiceMock); + asyncCallableService = new AsyncCallableService<>(taskMock, timeout, RETRY_DELAY, "test", executorServiceMock, onErrorMock); // WHEN Boolean serviceResult = asyncCallableService.call(); @@ -81,13 +89,16 @@ public class AsyncCallableServiceTest extends EasyMockSupport { Thread.sleep(10000000); return false; }; + onErrorMock.accept(anyObject(TimeoutException.class)); + replayAll(); - asyncCallableService = new AsyncCallableService<>(hangingTask, TIMEOUT, RETRY_DELAY, "test"); + asyncCallableService = new AsyncCallableService<>(hangingTask, TIMEOUT, RETRY_DELAY, "test", onErrorMock); // WHEN Boolean serviceResult = asyncCallableService.call(); // THEN + verifyAll(); Assert.assertNull("No result expected from hanging task", serviceResult); } @@ -95,8 +106,10 @@ public class AsyncCallableServiceTest extends EasyMockSupport { public void testCallableServiceShouldExitWhenTaskCompleted() throws Exception { // GIVEN expect(taskMock.call()).andReturn(Boolean.TRUE); + onErrorMock.accept(anyObject(TimeoutException.class)); + expectLastCall().andThrow(new AssertionError("No error expected")).anyTimes(); replayAll(); - asyncCallableService = new AsyncCallableService<>(taskMock, TIMEOUT, RETRY_DELAY, "test"); + asyncCallableService = new AsyncCallableService<>(taskMock, TIMEOUT, RETRY_DELAY, "test", onErrorMock); // WHEN Boolean serviceResult = asyncCallableService.call(); @@ -110,8 +123,9 @@ public class AsyncCallableServiceTest extends EasyMockSupport { public void testCallableServiceShouldRetryTaskExecutionTillTimeoutExceededWhenTaskThrowsException() throws Exception { // GIVEN expect(taskMock.call()).andThrow(new IllegalStateException("****************** TESTING ****************")).times(2, 3); + onErrorMock.accept(anyObject(IllegalStateException.class)); replayAll(); - asyncCallableService = new AsyncCallableService<>(taskMock, TIMEOUT, RETRY_DELAY, "test"); + asyncCallableService = new AsyncCallableService<>(taskMock, TIMEOUT, RETRY_DELAY, "test", onErrorMock); // WHEN Boolean serviceResult = asyncCallableService.call(); @@ -129,13 +143,16 @@ public class AsyncCallableServiceTest extends EasyMockSupport { Callable<Boolean> throwingTask = () -> { throw new IllegalStateException("****************** TESTING ****************"); }; + onErrorMock.accept(anyObject(IllegalStateException.class)); + replayAll(); - asyncCallableService = new AsyncCallableService<>(throwingTask, TIMEOUT, RETRY_DELAY, "test"); + asyncCallableService = new AsyncCallableService<>(throwingTask, TIMEOUT, RETRY_DELAY, "test", onErrorMock); // WHEN Boolean serviceResult = asyncCallableService.call(); // THEN + verifyAll(); Assert.assertNull("No result expected from throwing task", serviceResult); } } diff --git a/ambari-server/src/test/java/org/apache/ambari/server/topology/ConfigureClusterTaskTest.java b/ambari-server/src/test/java/org/apache/ambari/server/topology/ConfigureClusterTaskTest.java index f4afbea..925e6cb 100644 --- a/ambari-server/src/test/java/org/apache/ambari/server/topology/ConfigureClusterTaskTest.java +++ b/ambari-server/src/test/java/org/apache/ambari/server/topology/ConfigureClusterTaskTest.java @@ -93,7 +93,7 @@ public class ConfigureClusterTaskTest extends EasyMockSupport { clusterConfigurationRequest.process(); replayAll(); - AsyncCallableService<Boolean> asyncService = new AsyncCallableService<>(testSubject, 5000, 500, "test"); + AsyncCallableService<Boolean> asyncService = new AsyncCallableService<>(testSubject, 5000, 500, "test", t -> {}); // WHEN asyncService.call(); diff --git a/ambari-server/src/test/java/org/apache/ambari/server/topology/TopologyManagerTest.java b/ambari-server/src/test/java/org/apache/ambari/server/topology/TopologyManagerTest.java index 5f61c85..c12965d 100644 --- a/ambari-server/src/test/java/org/apache/ambari/server/topology/TopologyManagerTest.java +++ b/ambari-server/src/test/java/org/apache/ambari/server/topology/TopologyManagerTest.java @@ -398,6 +398,7 @@ public class TopologyManagerTest { List<LogicalRequest> requestList = new ArrayList<>(); requestList.add(logicalRequest); expect(logicalRequest.hasPendingHostRequests()).andReturn(false).anyTimes(); + expect(logicalRequest.isFinished()).andReturn(false).anyTimes(); allRequests.put(clusterTopologyMock, requestList); expect(requestStatusResponse.getTasks()).andReturn(Collections.emptyList()).anyTimes(); expect(clusterTopologyMock.isClusterKerberosEnabled()).andReturn(true); @@ -431,6 +432,8 @@ public class TopologyManagerTest { expect(requestStatusResponse.getTasks()).andReturn(tasks).anyTimes(); expect(persistedState.getAllRequests()).andReturn(Collections.emptyMap()).anyTimes(); expect(persistedState.getProvisionRequest(CLUSTER_ID)).andReturn(logicalRequest).anyTimes(); + expect(logicalRequest.isFinished()).andReturn(true).anyTimes(); + expect(logicalRequest.isSuccessful()).andReturn(true).anyTimes(); replayAll(); topologyManager.provisionCluster(request); requestFinished(); @@ -453,6 +456,8 @@ public class TopologyManagerTest { expect(requestStatusResponse.getTasks()).andReturn(tasks).anyTimes(); expect(persistedState.getAllRequests()).andReturn(Collections.emptyMap()).anyTimes(); expect(persistedState.getProvisionRequest(CLUSTER_ID)).andReturn(logicalRequest).anyTimes(); + expect(logicalRequest.isFinished()).andReturn(true).anyTimes(); + expect(logicalRequest.isSuccessful()).andReturn(false).anyTimes(); replayAll(); topologyManager.provisionCluster(request); requestFinished(); @@ -475,6 +480,7 @@ public class TopologyManagerTest { expect(requestStatusResponse.getTasks()).andReturn(tasks).anyTimes(); expect(persistedState.getAllRequests()).andReturn(Collections.emptyMap()).anyTimes(); expect(persistedState.getProvisionRequest(CLUSTER_ID)).andReturn(logicalRequest).anyTimes(); + expect(logicalRequest.isFinished()).andReturn(false).anyTimes(); replayAll(); topologyManager.provisionCluster(request); requestFinished(); @@ -515,6 +521,7 @@ public class TopologyManagerTest { expect(persistedState.getProvisionRequest(CLUSTER_ID)).andReturn(logicalRequest).anyTimes(); expect(logicalRequest.hasPendingHostRequests()).andReturn(true).anyTimes(); expect(logicalRequest.getCompletedHostRequests()).andReturn(Collections.EMPTY_LIST).anyTimes(); + expect(logicalRequest.isFinished()).andReturn(true).anyTimes(); expect(requestStatusResponse.getTasks()).andReturn(tasks).anyTimes(); replayAll(); EasyMock.replay(clusterTopologyMock); -- To stop receiving notification emails like this one, please contact ['"commits@ambari.apache.org" <commits@ambari.apache.org>'].