Repository: ambari Updated Branches: refs/heads/trunk e7961091f -> 11a7651c8
AMBARI-19929. TopologyRequest/TopologyLogicalRequest/TopologyHostRequest database inconsistency. (stoader) Project: http://git-wip-us.apache.org/repos/asf/ambari/repo Commit: http://git-wip-us.apache.org/repos/asf/ambari/commit/11a7651c Tree: http://git-wip-us.apache.org/repos/asf/ambari/tree/11a7651c Diff: http://git-wip-us.apache.org/repos/asf/ambari/diff/11a7651c Branch: refs/heads/trunk Commit: 11a7651c8f06c05155ae0a5de249f0906bf370a4 Parents: e796109 Author: Toader, Sebastian <stoa...@hortonworks.com> Authored: Sat Feb 25 12:42:31 2017 +0100 Committer: Toader, Sebastian <stoa...@hortonworks.com> Committed: Sat Feb 25 12:42:31 2017 +0100 ---------------------------------------------------------------------- .../apache/ambari/server/state/ServiceImpl.java | 10 -- .../server/state/cluster/ClusterImpl.java | 3 +- .../ambari/server/topology/TopologyManager.java | 131 ++++++++++++++----- 3 files changed, 98 insertions(+), 46 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/ambari/blob/11a7651c/ambari-server/src/main/java/org/apache/ambari/server/state/ServiceImpl.java ---------------------------------------------------------------------- diff --git a/ambari-server/src/main/java/org/apache/ambari/server/state/ServiceImpl.java b/ambari-server/src/main/java/org/apache/ambari/server/state/ServiceImpl.java index 2cccb21..31c53d8 100644 --- a/ambari-server/src/main/java/org/apache/ambari/server/state/ServiceImpl.java +++ b/ambari-server/src/main/java/org/apache/ambari/server/state/ServiceImpl.java @@ -418,7 +418,6 @@ public class ServiceImpl implements Service { */ private void persist(ClusterServiceEntity serviceEntity) { persistEntities(serviceEntity); - refresh(); // publish the service installed event StackId stackId = cluster.getDesiredStackVersion(); @@ -441,15 +440,6 @@ public class ServiceImpl implements Service { clusterServiceDAO.merge(serviceEntity); } - @Transactional - public void refresh() { - ClusterServiceEntityPK pk = new ClusterServiceEntityPK(); - pk.setClusterId(getClusterId()); - pk.setServiceName(getName()); - ClusterServiceEntity serviceEntity = getServiceEntity(); - clusterServiceDAO.refresh(serviceEntity); - serviceDesiredStateDAO.refresh(serviceEntity.getServiceDesiredStateEntity()); - } @Override public boolean canBeRemoved() { http://git-wip-us.apache.org/repos/asf/ambari/blob/11a7651c/ambari-server/src/main/java/org/apache/ambari/server/state/cluster/ClusterImpl.java ---------------------------------------------------------------------- diff --git a/ambari-server/src/main/java/org/apache/ambari/server/state/cluster/ClusterImpl.java b/ambari-server/src/main/java/org/apache/ambari/server/state/cluster/ClusterImpl.java index ab56844..739fe23 100644 --- a/ambari-server/src/main/java/org/apache/ambari/server/state/cluster/ClusterImpl.java +++ b/ambari-server/src/main/java/org/apache/ambari/server/state/cluster/ClusterImpl.java @@ -146,6 +146,7 @@ import org.apache.ambari.server.state.scheduler.RequestExecutionFactory; import org.apache.ambari.server.state.stack.upgrade.Direction; import org.apache.ambari.server.state.svccomphost.ServiceComponentHostSummary; import org.apache.ambari.server.topology.TopologyRequest; +import org.apache.commons.collections.CollectionUtils; import org.apache.commons.lang.StringUtils; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -447,7 +448,7 @@ public class ClusterImpl implements Cluster { private void loadServices() { ClusterEntity clusterEntity = getClusterEntity(); - if (clusterEntity.getClusterServiceEntities().isEmpty()) { + if (CollectionUtils.isEmpty(clusterEntity.getClusterServiceEntities())) { return; } http://git-wip-us.apache.org/repos/asf/ambari/blob/11a7651c/ambari-server/src/main/java/org/apache/ambari/server/topology/TopologyManager.java ---------------------------------------------------------------------- diff --git a/ambari-server/src/main/java/org/apache/ambari/server/topology/TopologyManager.java b/ambari-server/src/main/java/org/apache/ambari/server/topology/TopologyManager.java index a26624e..f53f04a 100644 --- a/ambari-server/src/main/java/org/apache/ambari/server/topology/TopologyManager.java +++ b/ambari-server/src/main/java/org/apache/ambari/server/topology/TopologyManager.java @@ -44,6 +44,7 @@ import org.apache.ambari.server.controller.AmbariServer; import org.apache.ambari.server.controller.RequestStatusResponse; import org.apache.ambari.server.controller.ShortTaskStatus; import org.apache.ambari.server.controller.internal.ArtifactResourceProvider; +import org.apache.ambari.server.controller.internal.BaseClusterRequest; import org.apache.ambari.server.controller.internal.CalculatedStatus; import org.apache.ambari.server.controller.internal.CredentialResourceProvider; import org.apache.ambari.server.controller.internal.ProvisionClusterRequest; @@ -77,6 +78,7 @@ import org.slf4j.LoggerFactory; import com.google.common.eventbus.Subscribe; import com.google.inject.Singleton; +import com.google.inject.persist.Transactional; /** * Manages all cluster provisioning actions on the cluster topology. @@ -246,20 +248,18 @@ public class TopologyManager { public RequestStatusResponse provisionCluster(final ProvisionClusterRequest request) throws InvalidTopologyException, AmbariException { ensureInitialized(); - if (null != request.getQuickLinksProfileJson()) { - saveOrUpdateQuickLinksProfile(request.getQuickLinksProfileJson()); - } - - ClusterTopology topology = new ClusterTopologyImpl(ambariContext, request); + final ClusterTopology topology = new ClusterTopologyImpl(ambariContext, request); final String clusterName = request.getClusterName(); + final Stack stack = topology.getBlueprint().getStack(); final String repoVersion = request.getRepositoryVersion(); // get the id prior to creating ambari resources which increments the counter - Long provisionId = ambariContext.getNextRequestId(); + final Long provisionId = ambariContext.getNextRequestId(); - final Stack stack = topology.getBlueprint().getStack(); boolean configureSecurity = false; + SecurityConfiguration securityConfiguration = processSecurityConfiguration(request); + if (securityConfiguration != null && securityConfiguration.getType() == SecurityType.KERBEROS) { configureSecurity = true; addKerberosClient(topology); @@ -291,18 +291,25 @@ public class TopologyManager { topology.setConfigRecommendationStrategy(request.getConfigRecommendationStrategy()); // set provision action requested topology.setProvisionAction(request.getProvisionAction()); - // persist request after it has successfully validated - PersistedTopologyRequest persistedRequest = RetryHelper.executeWithRetry(new Callable<PersistedTopologyRequest>() { - @Override - public PersistedTopologyRequest call() throws Exception { - return persistedState.persistTopologyRequest(request); + + // persist request + LogicalRequest logicalRequest = RetryHelper.executeWithRetry(new Callable<LogicalRequest>() { + @Override + public LogicalRequest call() throws Exception { + LogicalRequest logicalRequest = processAndPersistProvisionClusterTopologyRequest(request, topology, provisionId); + return logicalRequest; + } } - }); + ); + clusterTopologyMap.put(clusterId, topology); addClusterConfigRequest(topology, new ClusterConfigurationRequest( ambariContext, topology, true, stackAdvisorBlueprintProcessor, configureSecurity)); + + + // Notify listeners that cluster configuration finished executor.submit(new Callable<Boolean>() { @Override public Boolean call() throws Exception { @@ -310,7 +317,9 @@ public class TopologyManager { return Boolean.TRUE; } }); - LogicalRequest logicalRequest = processRequest(persistedRequest, topology, provisionId); + + // Process the logical request + processRequest(request, topology, logicalRequest); //todo: this should be invoked as part of a generic lifecycle event which could possibly //todo: be tied to cluster state @@ -320,6 +329,7 @@ public class TopologyManager { return getRequestStatus(logicalRequest.getRequestId()); } + /** * Saves the quick links profile to the DB as an Ambari setting. Creates a new setting entity or updates the existing * one. @@ -435,14 +445,14 @@ public class TopologyManager { } - public RequestStatusResponse scaleHosts(ScaleClusterRequest request) + public RequestStatusResponse scaleHosts(final ScaleClusterRequest request) throws InvalidTopologyException, AmbariException { ensureInitialized(); LOG.info("TopologyManager.scaleHosts: Entering"); String clusterName = request.getClusterName(); long clusterId = ambariContext.getClusterId(clusterName); - ClusterTopology topology = clusterTopologyMap.get(clusterId); + final ClusterTopology topology = clusterTopologyMap.get(clusterId); if (topology == null) { throw new InvalidTopologyException("Unable to retrieve cluster topology for cluster. This is most likely a " + "result of trying to scale a cluster via the API which was created using " + @@ -453,11 +463,64 @@ public class TopologyManager { hostNameCheck(request, topology); request.setClusterId(clusterId); - PersistedTopologyRequest persistedRequest = persistedState.persistTopologyRequest(request); + // this registers/updates all request host groups topology.update(request); - return getRequestStatus(processRequest(persistedRequest, topology, - ambariContext.getNextRequestId()).getRequestId()); + + final Long requestId = ambariContext.getNextRequestId(); + LogicalRequest logicalRequest = RetryHelper.executeWithRetry(new Callable<LogicalRequest>() { + @Override + public LogicalRequest call() throws Exception { + LogicalRequest logicalRequest = processAndPersistTopologyRequest(request, topology, requestId); + + return logicalRequest; + } + } + ); + + processRequest(request, topology, logicalRequest); + + return getRequestStatus(logicalRequest.getRequestId()); + } + + /** + * Creates and persists a {@see PersistedTopologyRequest} and a {@see LogicalRequest} for the provided + * provision cluster request and topology. + * @param request Provision cluster request to create a logical request for. + * @param topology Cluster topology + * @param logicalRequestId The Id for the created logical request + * @return Logical request created. + */ + @Transactional + protected LogicalRequest processAndPersistProvisionClusterTopologyRequest(ProvisionClusterRequest request, ClusterTopology topology, Long logicalRequestId) + throws InvalidTopologyException, AmbariException { + + if (null != request.getQuickLinksProfileJson()) { + saveOrUpdateQuickLinksProfile(request.getQuickLinksProfileJson()); + } + + LogicalRequest logicalRequest = processAndPersistTopologyRequest(request, topology, logicalRequestId); + + return logicalRequest; + + } + + + /** + * Creates and persists a {@see PersistedTopologyRequest} and a {@see LogicalRequest} for the provided request and topology. + * @param request {@see ProvisionClusterRequest} or {@see ScaleClusterRequest} to create a logical request for. + * @param topology Cluster topology + * @param logicalRequestId The Id for the created logical request + * @return Logical request created. + */ + @Transactional + protected LogicalRequest processAndPersistTopologyRequest(BaseClusterRequest request, ClusterTopology topology, Long logicalRequestId) + throws InvalidTopologyException, AmbariException { + PersistedTopologyRequest persistedRequest = persistedState.persistTopologyRequest(request); + + LogicalRequest logicalRequest = createLogicalRequest(persistedRequest, topology, logicalRequestId); + + return logicalRequest; } private void hostNameCheck(ScaleClusterRequest request, ClusterTopology topology) throws InvalidTopologyException { @@ -680,13 +743,12 @@ public class TopologyManager { return hostComponentMap; } - private LogicalRequest processRequest(PersistedTopologyRequest request, ClusterTopology topology, Long requestId) + private void processRequest(TopologyRequest request, ClusterTopology topology, final LogicalRequest logicalRequest) throws AmbariException { LOG.info("TopologyManager.processRequest: Entering"); - finalizeTopology(request.getRequest(), topology); - LogicalRequest logicalRequest = createLogicalRequest(request, topology, requestId); + finalizeTopology(request, topology); boolean requestHostComplete = false; //todo: overall synchronization. Currently we have nested synchronization here @@ -750,22 +812,16 @@ public class TopologyManager { } } } - return logicalRequest; } - private LogicalRequest createLogicalRequest(final PersistedTopologyRequest request, ClusterTopology topology, Long requestId) + @Transactional + protected LogicalRequest createLogicalRequest(final PersistedTopologyRequest request, ClusterTopology topology, Long requestId) throws AmbariException { final LogicalRequest logicalRequest = logicalRequestFactory.createRequest( requestId, request.getRequest(), topology); - RetryHelper.executeWithRetry(new Callable<Object>() { - @Override - public Object call() throws Exception { - persistedState.persistLogicalRequest(logicalRequest, request.getId()); - return null; - } - }); + persistedState.persistLogicalRequest(logicalRequest, request.getId()); allRequests.put(logicalRequest.getRequestId(), logicalRequest); LOG.info("TopologyManager.createLogicalRequest: created LogicalRequest with ID = {} and completed persistence of this request.", @@ -796,11 +852,10 @@ public class TopologyManager { // persist the host request -> hostName association try { - RetryHelper.executeWithRetry(new Callable<Object>() { + RetryHelper.executeWithRetry(new Callable<Void>() { @Override - public Object call() throws Exception { - persistedState.registerHostName(response.getHostRequestId(), hostName); - persistedState.registerInTopologyHostInfo(host); + public Void call() throws Exception { + persistTopologyHostRegistration(response.getHostRequestId(), host); return null; } }); @@ -822,6 +877,12 @@ public class TopologyManager { } } + @Transactional + protected void persistTopologyHostRegistration(long hostRequestId, final HostImpl host) { + persistedState.registerHostName(hostRequestId, host.getHostName()); + persistedState.registerInTopologyHostInfo(host); + } + private void queueHostTasks(ClusterTopology topology, HostOfferResponse response, String hostName) { LOG.info("TopologyManager.processAcceptedHostOffer: queueing tasks for host = {}", hostName); response.executeTasks(taskExecutor, hostName, topology, ambariContext);