Repository: ambari Updated Branches: refs/heads/branch-2.5 a742913da -> 93d5fbedc
AMBARI-20809. Support hostgroup downscale by host_count (magyari_sandor) Project: http://git-wip-us.apache.org/repos/asf/ambari/repo Commit: http://git-wip-us.apache.org/repos/asf/ambari/commit/93d5fbed Tree: http://git-wip-us.apache.org/repos/asf/ambari/tree/93d5fbed Diff: http://git-wip-us.apache.org/repos/asf/ambari/diff/93d5fbed Branch: refs/heads/branch-2.5 Commit: 93d5fbedc9d89d5f41d5ecd1163cfdbe2bd8763e Parents: a742913 Author: Sandor Magyari <smagy...@hortonworks.com> Authored: Fri Apr 28 14:42:55 2017 +0200 Committer: Sandor Magyari <smagy...@hortonworks.com> Committed: Fri Apr 28 23:22:05 2017 +0200 ---------------------------------------------------------------------- .../checks/DatabaseConsistencyCheckHelper.java | 70 ++++++++++-------- .../server/controller/RequestRequest.java | 11 +++ .../internal/RequestResourceProvider.java | 77 ++++++++++++++------ .../ambari/server/topology/LogicalRequest.java | 38 +++++++++- .../ambari/server/topology/PersistedState.java | 7 ++ .../server/topology/PersistedStateImpl.java | 10 +++ .../ambari/server/topology/TopologyManager.java | 56 +++++++++++--- .../src/main/resources/properties.json | 2 + .../DatabaseConsistencyCheckHelperTest.java | 7 +- .../internal/RequestResourceProviderTest.java | 33 ++++++--- .../server/topology/LogicalRequestTest.java | 64 ++++++++++++++++ .../server/topology/TopologyManagerTest.java | 9 ++- 12 files changed, 308 insertions(+), 76 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/ambari/blob/93d5fbed/ambari-server/src/main/java/org/apache/ambari/server/checks/DatabaseConsistencyCheckHelper.java ---------------------------------------------------------------------- diff --git a/ambari-server/src/main/java/org/apache/ambari/server/checks/DatabaseConsistencyCheckHelper.java b/ambari-server/src/main/java/org/apache/ambari/server/checks/DatabaseConsistencyCheckHelper.java index c33c4e3..34acc06 100644 --- a/ambari-server/src/main/java/org/apache/ambari/server/checks/DatabaseConsistencyCheckHelper.java +++ b/ambari-server/src/main/java/org/apache/ambari/server/checks/DatabaseConsistencyCheckHelper.java @@ -382,14 +382,14 @@ public class DatabaseConsistencyCheckHelper { String SELECT_REQUEST_COUNT_QUERY = "select count(tpr.id) from topology_request tpr"; String SELECT_JOINED_COUNT_QUERY = "select count(DISTINCT tpr.id) from topology_request tpr join " + - "topology_logical_request tlr on tpr.id = tlr.request_id join topology_host_request thr on tlr.id = " + - "thr.logical_request_id join topology_host_task tht on thr.id = tht.host_request_id join topology_logical_task " + - "tlt on tht.id = tlt.host_task_id"; + "topology_logical_request tlr on tpr.id = tlr.request_id"; - int topologyRequestCount = 0; - int topologyRequestTablesJoinedCount = 0; + String SELECT_HOST_REQUEST_COUNT_QUERY = "select count(thr.id) from topology_host_request thr"; + + String SELECT_HOST_JOINED_COUNT_QUERY = "select count(DISTINCT thr.id) from topology_host_request thr join " + + "topology_host_task tht on thr.id = tht.host_request_id join topology_logical_task " + + "tlt on tht.id = tlt.host_task_id"; - ResultSet rs = null; Statement statement = null; if (connection == null) { @@ -402,38 +402,25 @@ public class DatabaseConsistencyCheckHelper { try { statement = connection.createStatement(ResultSet.TYPE_SCROLL_SENSITIVE, ResultSet.CONCUR_UPDATABLE); - rs = statement.executeQuery(SELECT_REQUEST_COUNT_QUERY); - if (rs != null) { - while (rs.next()) { - topologyRequestCount = rs.getInt(1); - } - } - - rs = statement.executeQuery(SELECT_JOINED_COUNT_QUERY); - if (rs != null) { - while (rs.next()) { - topologyRequestTablesJoinedCount = rs.getInt(1); - } - } + int topologyRequestCount = runQuery(statement, SELECT_REQUEST_COUNT_QUERY); + int topologyRequestTablesJoinedCount = runQuery(statement, SELECT_JOINED_COUNT_QUERY); if (topologyRequestCount != topologyRequestTablesJoinedCount) { error("Your topology request hierarchy is not complete for each row in topology_request should exist " + - "at least one raw in topology_logical_request, topology_host_request, topology_host_task, " + - "topology_logical_task."); + "at least one row in topology_logical_request"); } + int topologyHostRequestCount = runQuery(statement, SELECT_HOST_REQUEST_COUNT_QUERY); + int topologyHostRequestTablesJoinedCount = runQuery(statement, SELECT_HOST_JOINED_COUNT_QUERY); + + if (topologyHostRequestCount != topologyHostRequestTablesJoinedCount) { + error("Your topology request hierarchy is not complete for each row in topology_host_request should exist " + + "at least one row in topology_host_task, topology_logical_task."); + } } catch (SQLException e) { LOG.error("Exception occurred during topology request tables check: ", e); } finally { - if (rs != null) { - try { - rs.close(); - } catch (SQLException e) { - LOG.error("Exception occurred during result set closing procedure: ", e); - } - } - if (statement != null) { try { statement.close(); @@ -445,6 +432,31 @@ public class DatabaseConsistencyCheckHelper { } + private static int runQuery(Statement statement, String query) { + ResultSet rs = null; + int result = 0; + try { + rs = statement.executeQuery(query); + + if (rs != null) { + while (rs.next()) { + result = rs.getInt(1); + } + } + + } catch (SQLException e) { + LOG.error("Exception occurred during topology request tables check: ", e); + } finally { + if (rs != null) { + try { + rs.close(); + } catch (SQLException e) { + LOG.error("Exception occurred during result set closing procedure: ", e); + } + } + } + return result; + } /** http://git-wip-us.apache.org/repos/asf/ambari/blob/93d5fbed/ambari-server/src/main/java/org/apache/ambari/server/controller/RequestRequest.java ---------------------------------------------------------------------- diff --git a/ambari-server/src/main/java/org/apache/ambari/server/controller/RequestRequest.java b/ambari-server/src/main/java/org/apache/ambari/server/controller/RequestRequest.java index db9268b..05c4bad 100644 --- a/ambari-server/src/main/java/org/apache/ambari/server/controller/RequestRequest.java +++ b/ambari-server/src/main/java/org/apache/ambari/server/controller/RequestRequest.java @@ -37,6 +37,8 @@ public class RequestRequest { private String abortReason; + private boolean removePendingHostRequests = false; + public HostRoleStatus getStatus() { return status; @@ -70,6 +72,14 @@ public class RequestRequest { this.abortReason = abortReason; } + public boolean isRemovePendingHostRequests() { + return removePendingHostRequests; + } + + public void setRemovePendingHostRequests(boolean removePendingHostRequests) { + this.removePendingHostRequests = removePendingHostRequests; + } + @Override public String toString() { return "RequestRequest{" + @@ -77,6 +87,7 @@ public class RequestRequest { ", requestId=" + requestId + ", status=" + status + ", abortReason='" + abortReason + '\'' + + ", removePendingHostRequests='" + removePendingHostRequests + '\'' + '}'; } } http://git-wip-us.apache.org/repos/asf/ambari/blob/93d5fbed/ambari-server/src/main/java/org/apache/ambari/server/controller/internal/RequestResourceProvider.java ---------------------------------------------------------------------- diff --git a/ambari-server/src/main/java/org/apache/ambari/server/controller/internal/RequestResourceProvider.java b/ambari-server/src/main/java/org/apache/ambari/server/controller/internal/RequestResourceProvider.java index 0690ee7..81463a0 100644 --- a/ambari-server/src/main/java/org/apache/ambari/server/controller/internal/RequestResourceProvider.java +++ b/ambari-server/src/main/java/org/apache/ambari/server/controller/internal/RequestResourceProvider.java @@ -115,6 +115,9 @@ public class RequestResourceProvider extends AbstractControllerResourceProvider protected static final String REQUEST_COMPLETED_TASK_CNT_ID = "Requests/completed_task_count"; protected static final String REQUEST_QUEUED_TASK_CNT_ID = "Requests/queued_task_count"; protected static final String REQUEST_PROGRESS_PERCENT_ID = "Requests/progress_percent"; + protected static final String REQUEST_REMOVE_PENDING_HOST_REQUESTS_ID = "Requests/remove_pending_host_requests"; + protected static final String REQUEST_PENDING_HOST_REQUEST_COUNT_ID = "Requests/pending_host_request_count"; + protected static final String COMMAND_ID = "command"; protected static final String SERVICE_ID = "service_name"; protected static final String COMPONENT_ID = "component_name"; @@ -152,7 +155,9 @@ public class RequestResourceProvider extends AbstractControllerResourceProvider REQUEST_TIMED_OUT_TASK_CNT_ID, REQUEST_COMPLETED_TASK_CNT_ID, REQUEST_QUEUED_TASK_CNT_ID, - REQUEST_PROGRESS_PERCENT_ID); + REQUEST_PROGRESS_PERCENT_ID, + REQUEST_REMOVE_PENDING_HOST_REQUESTS_ID, + REQUEST_PENDING_HOST_REQUEST_COUNT_ID); // ----- Constructors ---------------------------------------------------- @@ -297,6 +302,7 @@ public class RequestResourceProvider extends AbstractControllerResourceProvider public RequestStatus updateResources(Request requestInfo, Predicate predicate) throws SystemException, UnsupportedPropertyException, NoSuchResourceException, NoSuchParentResourceException { + AmbariManagementController amc = getManagementController(); final Set<RequestRequest> requests = new HashSet<RequestRequest>(); @@ -319,33 +325,48 @@ public class RequestResourceProvider extends AbstractControllerResourceProvider } // There should be only one request with this id (or no request at all) org.apache.ambari.server.actionmanager.Request internalRequest = internalRequests.get(0); - // Validate update request (check constraints on state value and presence of abort reason) - if (updateRequest.getAbortReason() == null || updateRequest.getAbortReason().isEmpty()) { - throw new IllegalArgumentException("Abort reason can not be empty."); - } - if (updateRequest.getStatus() != HostRoleStatus.ABORTED) { - throw new IllegalArgumentException( - String.format("%s is wrong value. The only allowed value " + - "for updating request status is ABORTED", - updateRequest.getStatus())); - } + if (updateRequest.isRemovePendingHostRequests()) { + if (internalRequest instanceof LogicalRequest) { + targets.add(internalRequest); + } else { + throw new IllegalArgumentException("Request with id: " + internalRequest.getRequestId() + "is not a Logical Request."); + } + } else { + // Validate update request (check constraints on state value and presence of abort reason) + if (updateRequest.getAbortReason() == null || updateRequest.getAbortReason().isEmpty()) { + throw new IllegalArgumentException("Abort reason can not be empty."); + } + + if (updateRequest.getStatus() != HostRoleStatus.ABORTED) { + throw new IllegalArgumentException( + String.format("%s is wrong value. The only allowed value " + + "for updating request status is ABORTED", + updateRequest.getStatus())); + } - HostRoleStatus internalRequestStatus = - CalculatedStatus.statusFromStages(internalRequest.getStages()).getStatus(); + HostRoleStatus internalRequestStatus = + CalculatedStatus.statusFromStages(internalRequest.getStages()).getStatus(); - if (internalRequestStatus.isCompletedState()) { - // Ignore updates to completed requests to avoid throwing exception on race condition - } else { - // Validation passed - targets.add(internalRequest); + if (internalRequestStatus.isCompletedState()) { + // Ignore updates to completed requests to avoid throwing exception on race condition + } else { + // Validation passed + targets.add(internalRequest); + } } + } + // Perform update Iterator<RequestRequest> reqIterator = requests.iterator(); for (org.apache.ambari.server.actionmanager.Request target : targets) { - String reason = reqIterator.next().getAbortReason(); - amc.getActionManager().cancelRequest(target.getRequestId(), reason); + if (target instanceof LogicalRequest) { + topologyManager.removePendingHostRequests(target.getClusterName(), target.getRequestId()); + } else { + String reason = reqIterator.next().getAbortReason(); + amc.getActionManager().cancelRequest(target.getRequestId(), reason); + } } return getRequestStatus(null); } @@ -363,9 +384,15 @@ public class RequestResourceProvider extends AbstractControllerResourceProvider requestStatus = HostRoleStatus.valueOf(requestStatusStr); } String abortReason = (String) propertyMap.get(REQUEST_ABORT_REASON_PROPERTY_ID); + String removePendingHostRequests = (String) propertyMap.get(REQUEST_REMOVE_PENDING_HOST_REQUESTS_ID); + RequestRequest requestRequest = new RequestRequest(clusterNameStr, requestId); requestRequest.setStatus(requestStatus); requestRequest.setAbortReason(abortReason); + if (removePendingHostRequests != null) { + requestRequest.setRemovePendingHostRequests(Boolean.valueOf(removePendingHostRequests)); + } + return requestRequest; } @@ -753,13 +780,21 @@ public class RequestResourceProvider extends AbstractControllerResourceProvider // in this case, it appears that there are no tasks but this is a logical // topology request, so it's a matter of hosts simply not registering yet // for tasks to be created - status = CalculatedStatus.PENDING; + if (logicalRequest.hasPendingHostRequests()) { + status = CalculatedStatus.PENDING; + } else { + status = CalculatedStatus.COMPLETED; + } } else { // there are either tasks or this is not a logical request, so do normal // status calculations status = CalculatedStatus.statusFromStageSummary(summary, summary.keySet()); } + if (null != logicalRequest) { + setResourceProperty(resource, REQUEST_PENDING_HOST_REQUEST_COUNT_ID, logicalRequest.getPendingHostRequestCount(), requestedPropertyIds); + } + setResourceProperty(resource, REQUEST_STATUS_PROPERTY_ID, status.getStatus().toString(), requestedPropertyIds); setResourceProperty(resource, REQUEST_PROGRESS_PERCENT_ID, status.getPercent(), requestedPropertyIds); http://git-wip-us.apache.org/repos/asf/ambari/blob/93d5fbed/ambari-server/src/main/java/org/apache/ambari/server/topology/LogicalRequest.java ---------------------------------------------------------------------- diff --git a/ambari-server/src/main/java/org/apache/ambari/server/topology/LogicalRequest.java b/ambari-server/src/main/java/org/apache/ambari/server/topology/LogicalRequest.java index de4211f..eb1c343 100644 --- a/ambari-server/src/main/java/org/apache/ambari/server/topology/LogicalRequest.java +++ b/ambari-server/src/main/java/org/apache/ambari/server/topology/LogicalRequest.java @@ -164,8 +164,8 @@ public class LogicalRequest extends Request { return requestsWithReservedHosts.keySet(); } - public boolean hasCompleted() { - return requestsWithReservedHosts.isEmpty() && outstandingHostRequests.isEmpty(); + public boolean hasPendingHostRequests() { + return !requestsWithReservedHosts.isEmpty() || !outstandingHostRequests.isEmpty(); } public Collection<HostRequest> getCompletedHostRequests() { @@ -176,11 +176,45 @@ public class LogicalRequest extends Request { return completedHostRequests; } + public int getPendingHostRequestCount() { + return outstandingHostRequests.size() + requestsWithReservedHosts.size(); + } + //todo: this is only here for toEntity() functionality public Collection<HostRequest> getHostRequests() { return new ArrayList<HostRequest>(allHostRequests); } + /** + * Removes pending host requests (outstanding requests not picked up by any host, where hostName is null) for a host group. + * @param hostGroupName + * @return + */ + public Collection<HostRequest> removePendingHostRequests(String hostGroupName) { + Collection<HostRequest> pendingHostRequests = new ArrayList<>(); + for(HostRequest hostRequest : outstandingHostRequests) { + if(hostGroupName == null || hostRequest.getHostgroupName().equals(hostGroupName)) { + pendingHostRequests.add(hostRequest); + } + } + outstandingHostRequests.clear(); + + Collection<String> pendingReservedHostNames = new ArrayList<>(); + for(String reservedHostName : requestsWithReservedHosts.keySet()) { + HostRequest hostRequest = requestsWithReservedHosts.get(reservedHostName); + if(hostGroupName == null || hostRequest.getHostgroupName().equals(hostGroupName)) { + pendingHostRequests.add(hostRequest); + pendingReservedHostNames.add(reservedHostName); + } + } + for (String hostName : pendingReservedHostNames) { + requestsWithReservedHosts.remove(hostName); + } + + allHostRequests.removeAll(pendingHostRequests); + return pendingHostRequests; + } + public Map<String, Collection<String>> getProjectedTopology() { Map<String, Collection<String>> hostComponentMap = new HashMap<String, Collection<String>>(); http://git-wip-us.apache.org/repos/asf/ambari/blob/93d5fbed/ambari-server/src/main/java/org/apache/ambari/server/topology/PersistedState.java ---------------------------------------------------------------------- diff --git a/ambari-server/src/main/java/org/apache/ambari/server/topology/PersistedState.java b/ambari-server/src/main/java/org/apache/ambari/server/topology/PersistedState.java index 8003268..2989de6 100644 --- a/ambari-server/src/main/java/org/apache/ambari/server/topology/PersistedState.java +++ b/ambari-server/src/main/java/org/apache/ambari/server/topology/PersistedState.java @@ -18,6 +18,7 @@ package org.apache.ambari.server.topology; +import java.util.Collection; import java.util.List; import java.util.Map; @@ -79,4 +80,10 @@ public interface PersistedState { * @return */ LogicalRequest getProvisionRequest(long clusterId); + + /** + * + * @param hostRequests + */ + void removeHostRequests(Collection<HostRequest> hostRequests); } http://git-wip-us.apache.org/repos/asf/ambari/blob/93d5fbed/ambari-server/src/main/java/org/apache/ambari/server/topology/PersistedStateImpl.java ---------------------------------------------------------------------- diff --git a/ambari-server/src/main/java/org/apache/ambari/server/topology/PersistedStateImpl.java b/ambari-server/src/main/java/org/apache/ambari/server/topology/PersistedStateImpl.java index 36eb1bc..a8b202e 100644 --- a/ambari-server/src/main/java/org/apache/ambari/server/topology/PersistedStateImpl.java +++ b/ambari-server/src/main/java/org/apache/ambari/server/topology/PersistedStateImpl.java @@ -53,6 +53,7 @@ import org.slf4j.LoggerFactory; import com.google.gson.Gson; import com.google.inject.Inject; +import com.google.inject.persist.Transactional; /** * Implementation which uses Ambari Database DAO and Entity objects for persistence @@ -119,6 +120,15 @@ public class PersistedStateImpl implements PersistedState { } @Override + @Transactional + public void removeHostRequests(Collection<HostRequest> hostRequests) { + for(HostRequest hostRequest : hostRequests) { + TopologyHostRequestEntity hostRequestEntity = hostRequestDAO.findById(hostRequest.getId()); + hostRequestDAO.remove(hostRequestEntity); + } + } + + @Override public void registerPhysicalTask(long logicalTaskId, long physicalTaskId) { TopologyLogicalTaskEntity entity = topologyLogicalTaskDAO.findById(logicalTaskId); HostRoleCommandEntity physicalEntity = hostRoleCommandDAO.findByPK(physicalTaskId); http://git-wip-us.apache.org/repos/asf/ambari/blob/93d5fbed/ambari-server/src/main/java/org/apache/ambari/server/topology/TopologyManager.java ---------------------------------------------------------------------- diff --git a/ambari-server/src/main/java/org/apache/ambari/server/topology/TopologyManager.java b/ambari-server/src/main/java/org/apache/ambari/server/topology/TopologyManager.java index c192122..8a22e53 100644 --- a/ambari-server/src/main/java/org/apache/ambari/server/topology/TopologyManager.java +++ b/ambari-server/src/main/java/org/apache/ambari/server/topology/TopologyManager.java @@ -434,7 +434,7 @@ public class TopologyManager { Map<String, String> requestInfoProps = new HashMap<>(); requestInfoProps.put(org.apache.ambari.server.controller.spi.Request.REQUEST_INFO_BODY_PROPERTY, - "{\"" + ArtifactResourceProvider.ARTIFACT_DATA_PROPERTY + "\": " + descriptor + "}"); + "{\"" + ArtifactResourceProvider.ARTIFACT_DATA_PROPERTY + "\": " + descriptor + "}"); org.apache.ambari.server.controller.spi.Request request = new RequestImpl(Collections.<String>emptySet(), Collections.singleton(properties), requestInfoProps, null); @@ -485,20 +485,55 @@ public class TopologyManager { final Long requestId = ambariContext.getNextRequestId(); LogicalRequest logicalRequest = RetryHelper.executeWithRetry(new Callable<LogicalRequest>() { - @Override - public LogicalRequest call() throws Exception { - LogicalRequest logicalRequest = processAndPersistTopologyRequest(request, topology, requestId); + @Override + public LogicalRequest call() throws Exception { + LogicalRequest logicalRequest = processAndPersistTopologyRequest(request, topology, requestId); - return logicalRequest; - } - } + return logicalRequest; + } + } ); - processRequest(request, topology, logicalRequest); - return getRequestStatus(logicalRequest.getRequestId()); } + public void removePendingHostRequests(String clusterName, long requestId) { + ensureInitialized(); + LOG.info("TopologyManager.removePendingHostRequests: Entering"); + + long clusterId = 0; + try { + clusterId = ambariContext.getClusterId(clusterName); + } catch (AmbariException e) { + LOG.error("Unable to retrieve clusterId", e); + throw new IllegalArgumentException("Unable to retrieve clusterId"); + } + ClusterTopology topology = clusterTopologyMap.get(clusterId); + if (topology == null) { + throw new IllegalArgumentException("Unable to retrieve cluster topology for cluster"); + } + + LogicalRequest logicalRequest = allRequests.get(requestId); + if (logicalRequest == null) { + throw new IllegalArgumentException("No Logical Request found for requestId: " + requestId); + } + + Collection<HostRequest> pendingHostRequests = logicalRequest.removePendingHostRequests(null); + + if (!logicalRequest.hasPendingHostRequests()) { + outstandingRequests.remove(logicalRequest); + } + + persistedState.removeHostRequests(pendingHostRequests); + + // set current host count to number of currently connected hosts + for (HostGroupInfo currentHostGroupInfo : topology.getHostGroupInfo().values()) { + currentHostGroupInfo.setRequestedCount(currentHostGroupInfo.getHostNames().size()); + } + + LOG.info("TopologyManager.removePendingHostRequests: Exit"); + } + /** * Creates and persists a {@see PersistedTopologyRequest} and a {@see LogicalRequest} for the provided * provision cluster request and topology. @@ -937,7 +972,7 @@ public class TopologyManager { for (LogicalRequest logicalRequest : requestEntry.getValue()) { allRequests.put(logicalRequest.getRequestId(), logicalRequest); - if (!logicalRequest.hasCompleted()) { + if (logicalRequest.hasPendingHostRequests()) { outstandingRequests.add(logicalRequest); for (String reservedHost : logicalRequest.getReservedHosts()) { reservedHosts.put(reservedHost, logicalRequest); @@ -968,6 +1003,7 @@ public class TopologyManager { } } } + LOG.info("TopologyManager.replayRequests: Exit"); } /** http://git-wip-us.apache.org/repos/asf/ambari/blob/93d5fbed/ambari-server/src/main/resources/properties.json ---------------------------------------------------------------------- diff --git a/ambari-server/src/main/resources/properties.json b/ambari-server/src/main/resources/properties.json index 33854c5..779d0e9 100644 --- a/ambari-server/src/main/resources/properties.json +++ b/ambari-server/src/main/resources/properties.json @@ -135,6 +135,8 @@ "Requests/queued_task_count", "Requests/progress_percent", "Requests/abort_reason", + "Requests/remove_pending_host_requests", + "Requests/pending_host_request_count", "_" ], "RequestSchedule" : [ http://git-wip-us.apache.org/repos/asf/ambari/blob/93d5fbed/ambari-server/src/test/java/org/apache/ambari/server/checks/DatabaseConsistencyCheckHelperTest.java ---------------------------------------------------------------------- diff --git a/ambari-server/src/test/java/org/apache/ambari/server/checks/DatabaseConsistencyCheckHelperTest.java b/ambari-server/src/test/java/org/apache/ambari/server/checks/DatabaseConsistencyCheckHelperTest.java index 455ce01..5869630 100644 --- a/ambari-server/src/test/java/org/apache/ambari/server/checks/DatabaseConsistencyCheckHelperTest.java +++ b/ambari-server/src/test/java/org/apache/ambari/server/checks/DatabaseConsistencyCheckHelperTest.java @@ -228,7 +228,12 @@ public class DatabaseConsistencyCheckHelperTest { expect(mockJoinResultSet.getInt(1)).andReturn(resultCount); expect(mockStatement.executeQuery("select count(tpr.id) from topology_request tpr")).andReturn(mockCountResultSet); expect(mockStatement.executeQuery("select count(DISTINCT tpr.id) from topology_request tpr join " + - "topology_logical_request tlr on tpr.id = tlr.request_id join topology_host_request thr on tlr.id = thr.logical_request_id join topology_host_task tht on thr.id = tht.host_request_id join topology_logical_task tlt on tht.id = tlt.host_task_id")).andReturn(mockJoinResultSet); + "topology_logical_request tlr on tpr.id = tlr.request_id")).andReturn(mockJoinResultSet); + + expect(mockStatement.executeQuery("select count(thr.id) from topology_host_request thr")).andReturn(mockCountResultSet); + expect(mockStatement.executeQuery("select count(DISTINCT thr.id) from topology_host_request thr join " + + "topology_host_task tht on thr.id = tht.host_request_id join topology_logical_task " + + "tlt on tht.id = tlt.host_task_id")).andReturn(mockJoinResultSet); DatabaseConsistencyCheckHelper.setInjector(mockInjector); DatabaseConsistencyCheckHelper.setConnection(mockConnection); http://git-wip-us.apache.org/repos/asf/ambari/blob/93d5fbed/ambari-server/src/test/java/org/apache/ambari/server/controller/internal/RequestResourceProviderTest.java ---------------------------------------------------------------------- diff --git a/ambari-server/src/test/java/org/apache/ambari/server/controller/internal/RequestResourceProviderTest.java b/ambari-server/src/test/java/org/apache/ambari/server/controller/internal/RequestResourceProviderTest.java index f7dff11..054e1c8 100644 --- a/ambari-server/src/test/java/org/apache/ambari/server/controller/internal/RequestResourceProviderTest.java +++ b/ambari-server/src/test/java/org/apache/ambari/server/controller/internal/RequestResourceProviderTest.java @@ -78,6 +78,7 @@ import org.apache.ambari.server.state.Cluster; import org.apache.ambari.server.state.Clusters; import org.apache.ambari.server.topology.Blueprint; import org.apache.ambari.server.topology.ClusterTopology; +import org.apache.ambari.server.topology.HostGroup; import org.apache.ambari.server.topology.HostGroupInfo; import org.apache.ambari.server.topology.LogicalRequest; import org.apache.ambari.server.topology.TopologyManager; @@ -1649,7 +1650,12 @@ public class RequestResourceProviderTest { ClusterTopology topology = createNiceMock(ClusterTopology.class); + + HostGroup hostGroup = createNiceMock(HostGroup.class); + expect(hostGroup.getName()).andReturn("host_group_1").anyTimes(); + Blueprint blueprint = createNiceMock(Blueprint.class); + expect(blueprint.getHostGroup("host_group_1")).andReturn(hostGroup).anyTimes(); expect(topology.getClusterId()).andReturn(2L).anyTimes(); Long clusterId = 2L; @@ -1666,8 +1672,13 @@ public class RequestResourceProviderTest { expect(hrcDAO.findAggregateCounts((Long) anyObject())).andReturn( Collections.<Long, HostRoleCommandStatusSummaryDTO>emptyMap()).anyTimes(); + Map<String, HostGroupInfo> hostGroupInfoMap = new HashMap<>(); + HostGroupInfo hostGroupInfo = new HostGroupInfo("host_group_1"); + hostGroupInfo.setRequestedCount(1); + hostGroupInfoMap.put("host_group_1", hostGroupInfo); + TopologyRequest topologyRequest = createNiceMock(TopologyRequest.class); - expect(topologyRequest.getHostGroupInfo()).andReturn(Collections.<String, HostGroupInfo>emptyMap()).anyTimes(); + expect(topologyRequest.getHostGroupInfo()).andReturn(hostGroupInfoMap).anyTimes(); expect(topology.getBlueprint()).andReturn(blueprint).anyTimes(); expect(blueprint.shouldSkipFailure()).andReturn(true).anyTimes(); @@ -1677,24 +1688,28 @@ public class RequestResourceProviderTest { expect(AmbariServer.getController()).andReturn(managementController).anyTimes(); PowerMock.replayAll( - topologyRequest, - topology, - blueprint, - managementController, - clusters); + topologyRequest, + topology, + blueprint, + managementController, + clusters); - LogicalRequest logicalRequest = new LogicalRequest(200L, topologyRequest, topology); + LogicalRequest logicalRequest = createNiceMock(LogicalRequest.class); + expect(logicalRequest.hasPendingHostRequests()).andReturn(true).anyTimes(); + expect(logicalRequest.constructNewPersistenceEntity()).andReturn(requestMock).anyTimes(); reset(topologyManager); expect(topologyManager.getRequest(100L)).andReturn(logicalRequest).anyTimes(); + + expect(topologyManager.getRequests(eq(Collections.singletonList(100L)))).andReturn( Collections.singletonList(logicalRequest)).anyTimes(); expect(topologyManager.getStageSummaries(EasyMock.<Long>anyObject())).andReturn( Collections.<Long, HostRoleCommandStatusSummaryDTO>emptyMap()).anyTimes(); - replay(actionManager, requestMock, requestDAO, hrcDAO, topologyManager); + replay(actionManager, requestMock, requestDAO, hrcDAO, topologyManager, logicalRequest); ResourceProvider provider = AbstractControllerResourceProvider.getResourceProvider( type, @@ -1722,7 +1737,7 @@ public class RequestResourceProviderTest { // verify PowerMock.verifyAll(); - verify(actionManager, requestMock, requestDAO, hrcDAO, topologyManager); + verify(actionManager, requestMock, requestDAO, hrcDAO, topologyManager, logicalRequest); Assert.assertEquals(1, resources.size()); for (Resource resource : resources) { http://git-wip-us.apache.org/repos/asf/ambari/blob/93d5fbed/ambari-server/src/test/java/org/apache/ambari/server/topology/LogicalRequestTest.java ---------------------------------------------------------------------- diff --git a/ambari-server/src/test/java/org/apache/ambari/server/topology/LogicalRequestTest.java b/ambari-server/src/test/java/org/apache/ambari/server/topology/LogicalRequestTest.java index 63b7e9f..2dd45b7 100644 --- a/ambari-server/src/test/java/org/apache/ambari/server/topology/LogicalRequestTest.java +++ b/ambari-server/src/test/java/org/apache/ambari/server/topology/LogicalRequestTest.java @@ -461,4 +461,68 @@ public class LogicalRequestTest extends EasyMockSupport { assertTrue(hostReqHost1.isPresent() && hostReqHost2.isPresent() && hostReqHost3.isPresent() && !hostReqHost4.isPresent()); } + + @Test + public void testRemovePendingHostRequests() throws Exception { + // Given + Long requestId = 1L; + + final TopologyHostInfoEntity host1 = new TopologyHostInfoEntity(); + host1.setId(100L); + host1.setFqdn("host1"); + + final TopologyHostInfoEntity host2 = new TopologyHostInfoEntity(); + host2.setId(102L); + host2.setFqdn("host2"); + + TopologyHostGroupEntity hostGroupEntity1 = new TopologyHostGroupEntity(); + hostGroupEntity1.setTopologyHostInfoEntities(ImmutableSet.of(host1, host2)); + hostGroupEntity1.setName("host_group_1"); + + // host request matched to a registered host + TopologyHostRequestEntity hostRequestEntityHost1Matched = new TopologyHostRequestEntity(); + hostRequestEntityHost1Matched.setId(1L); + hostRequestEntityHost1Matched.setHostName(host1.getFqdn()); //host request matched host1 + hostRequestEntityHost1Matched.setTopologyHostGroupEntity(hostGroupEntity1); + hostRequestEntityHost1Matched.setTopologyHostTaskEntities(Collections.<TopologyHostTaskEntity>emptySet()); + expect(ambariContext.isHostRegisteredWithCluster(eq(clusterId), eq(host1.getFqdn()))).andReturn(true).anyTimes(); + + + // host request that hasn't been matched to any registered host yet + TopologyHostRequestEntity hostRequestEntityHost2NotMatched = new TopologyHostRequestEntity(); + hostRequestEntityHost2NotMatched.setId(2L); + hostRequestEntityHost2NotMatched.setTopologyHostGroupEntity(hostGroupEntity1); + hostRequestEntityHost2NotMatched.setTopologyHostTaskEntities(Collections.<TopologyHostTaskEntity>emptySet()); + expect(ambariContext.isHostRegisteredWithCluster(eq(clusterId), eq(host2.getFqdn()))).andReturn(false).anyTimes(); + + + Collection<TopologyHostRequestEntity> reservedHostRequestEntities = ImmutableSet.of( + hostRequestEntityHost1Matched, + hostRequestEntityHost2NotMatched); + + hostGroupEntity1.setTopologyHostRequestEntities(reservedHostRequestEntities); + + TopologyRequestEntity topologyRequestEntity = new TopologyRequestEntity(); + topologyRequestEntity.setTopologyHostGroupEntities(Collections.singleton(hostGroupEntity1)); + + + expect(logicalRequestEntity.getTopologyRequestEntity()).andReturn(topologyRequestEntity).atLeastOnce(); + expect(logicalRequestEntity.getTopologyHostRequestEntities()).andReturn(reservedHostRequestEntities).atLeastOnce(); + expect(blueprint.getHostGroup(eq("host_group_1"))).andReturn(hostGroup1).atLeastOnce(); + expect(hostGroup1.containsMasterComponent()).andReturn(false).atLeastOnce(); + + replayAll(); + + // When + + LogicalRequest req = new LogicalRequest(requestId, replayedTopologyRequest, clusterTopology, logicalRequestEntity); + req.removePendingHostRequests(null); + + // Then + verifyAll(); + + Collection<HostRequest> hostRequests = req.getHostRequests(); + assertEquals(1, hostRequests.size()); + + } } http://git-wip-us.apache.org/repos/asf/ambari/blob/93d5fbed/ambari-server/src/test/java/org/apache/ambari/server/topology/TopologyManagerTest.java ---------------------------------------------------------------------- diff --git a/ambari-server/src/test/java/org/apache/ambari/server/topology/TopologyManagerTest.java b/ambari-server/src/test/java/org/apache/ambari/server/topology/TopologyManagerTest.java index 0c1a13d..587d0af 100644 --- a/ambari-server/src/test/java/org/apache/ambari/server/topology/TopologyManagerTest.java +++ b/ambari-server/src/test/java/org/apache/ambari/server/topology/TopologyManagerTest.java @@ -396,7 +396,7 @@ public class TopologyManagerTest { Map<ClusterTopology, List<LogicalRequest>> allRequests = new HashMap<>(); List<LogicalRequest> requestList = new ArrayList<>(); requestList.add(logicalRequest); - expect(logicalRequest.hasCompleted()).andReturn(true).anyTimes(); + expect(logicalRequest.hasPendingHostRequests()).andReturn(false).anyTimes(); allRequests.put(clusterTopologyMock, requestList); expect(requestStatusResponse.getTasks()).andReturn(Collections.<ShortTaskStatus>emptyList()).anyTimes(); expect(clusterTopologyMock.isClusterKerberosEnabled()).andReturn(true); @@ -405,8 +405,8 @@ public class TopologyManagerTest { expect(persistedState.getAllRequests()).andReturn(allRequests).anyTimes(); expect(persistedState.getProvisionRequest(CLUSTER_ID)).andReturn(logicalRequest).anyTimes(); expect(ambariContext.isTopologyResolved(CLUSTER_ID)).andReturn(true).anyTimes(); - expect(group1.addComponent("KERBEROS_CLIENT")).andReturn(true); - expect(group2.addComponent("KERBEROS_CLIENT")).andReturn(true); + expect(group1.addComponent("KERBEROS_CLIENT")).andReturn(true).anyTimes(); + expect(group2.addComponent("KERBEROS_CLIENT")).andReturn(true).anyTimes(); replayAll(); @@ -515,7 +515,8 @@ public class TopologyManagerTest { allRequests.put(clusterTopologyMock, logicalRequests); expect(persistedState.getAllRequests()).andReturn(allRequests).anyTimes(); expect(persistedState.getProvisionRequest(CLUSTER_ID)).andReturn(logicalRequest).anyTimes(); - expect(logicalRequest.hasCompleted()).andReturn(true).anyTimes(); + expect(logicalRequest.hasPendingHostRequests()).andReturn(true).anyTimes(); + expect(logicalRequest.getCompletedHostRequests()).andReturn(Collections.EMPTY_LIST).anyTimes(); expect(requestStatusResponse.getTasks()).andReturn(tasks).anyTimes(); replayAll(); EasyMock.replay(clusterTopologyMock);