Repository: ambari Updated Branches: refs/heads/trunk 913bf4ed6 -> 923bf81ae
AMBARI-9738 - Ambari Server Deadlocks During Cluster Provisioning (jonathanhurley) Project: http://git-wip-us.apache.org/repos/asf/ambari/repo Commit: http://git-wip-us.apache.org/repos/asf/ambari/commit/923bf81a Tree: http://git-wip-us.apache.org/repos/asf/ambari/tree/923bf81a Diff: http://git-wip-us.apache.org/repos/asf/ambari/diff/923bf81a Branch: refs/heads/trunk Commit: 923bf81ae125ac6581cef053aa2c31baca1dcb65 Parents: 913bf4e Author: Jonathan Hurley <jhur...@hortonworks.com> Authored: Sat Feb 21 16:55:24 2015 -0500 Committer: Jonathan Hurley <jhur...@hortonworks.com> Committed: Mon Feb 23 10:14:29 2015 -0500 ---------------------------------------------------------------------- .../server/state/cluster/ClusterImpl.java | 105 ++++---- .../server/state/cluster/ClustersImpl.java | 151 ++++++------ .../state/cluster/ClustersDeadlockTest.java | 239 +++++++++++++++++++ 3 files changed, 374 insertions(+), 121 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/ambari/blob/923bf81a/ambari-server/src/main/java/org/apache/ambari/server/state/cluster/ClusterImpl.java ---------------------------------------------------------------------- diff --git a/ambari-server/src/main/java/org/apache/ambari/server/state/cluster/ClusterImpl.java b/ambari-server/src/main/java/org/apache/ambari/server/state/cluster/ClusterImpl.java index 8589182..c609b0a 100644 --- a/ambari-server/src/main/java/org/apache/ambari/server/state/cluster/ClusterImpl.java +++ b/ambari-server/src/main/java/org/apache/ambari/server/state/cluster/ClusterImpl.java @@ -92,9 +92,7 @@ import org.apache.ambari.server.state.ConfigVersionHelper; import org.apache.ambari.server.state.DesiredConfig; import org.apache.ambari.server.state.Host; import org.apache.ambari.server.state.HostHealthStatus; -import org.apache.ambari.server.state.HostState; import org.apache.ambari.server.state.MaintenanceState; -import org.apache.ambari.server.state.PropertyInfo; import org.apache.ambari.server.state.RepositoryVersionState; import org.apache.ambari.server.state.SecurityType; import org.apache.ambari.server.state.Service; @@ -616,23 +614,25 @@ public class ClusterImpl implements Cluster { } } - public void addServiceComponentHost( - ServiceComponentHost svcCompHost) throws AmbariException { + public void addServiceComponentHost(ServiceComponentHost svcCompHost) + throws AmbariException { + if (LOG.isDebugEnabled()) { + LOG.debug("Trying to add component {} of service {} on {} to the cache", + svcCompHost.getServiceComponentName(), svcCompHost.getServiceName(), + svcCompHost.getHostName()); + } + loadServiceHostComponents(); + + final String hostname = svcCompHost.getHostName(); + final String serviceName = svcCompHost.getServiceName(); + final String componentName = svcCompHost.getServiceComponentName(); + + Set<Cluster> cs = clusters.getClustersForHost(hostname); + clusterGlobalLock.writeLock().lock(); try { - if (LOG.isDebugEnabled()) { - LOG.debug("Trying to add ServiceComponentHost to ClusterHostMap cache" - + ", serviceName=" + svcCompHost.getServiceName() - + ", componentName=" + svcCompHost.getServiceComponentName() - + ", hostname=" + svcCompHost.getHostName()); - } - - final String hostname = svcCompHost.getHostName(); - final String serviceName = svcCompHost.getServiceName(); - final String componentName = svcCompHost.getServiceComponentName(); - Set<Cluster> cs = clusters.getClustersForHost(hostname); boolean clusterFound = false; Iterator<Cluster> iter = cs.iterator(); while (iter.hasNext()) { @@ -642,6 +642,7 @@ public class ClusterImpl implements Cluster { break; } } + if (!clusterFound) { throw new AmbariException("Host does not belong this cluster" + ", hostname=" + hostname + ", clusterName=" + getClusterName() @@ -652,6 +653,7 @@ public class ClusterImpl implements Cluster { serviceComponentHosts.put(serviceName, new HashMap<String, Map<String, ServiceComponentHost>>()); } + if (!serviceComponentHosts.get(serviceName).containsKey(componentName)) { serviceComponentHosts.get(serviceName).put(componentName, new HashMap<String, ServiceComponentHost>()); @@ -687,22 +689,22 @@ public class ClusterImpl implements Cluster { @Override public void removeServiceComponentHost(ServiceComponentHost svcCompHost) throws AmbariException { + if (LOG.isDebugEnabled()) { + LOG.debug( + "Trying to remove component {} of service {} on {} from the cache", + svcCompHost.getServiceComponentName(), svcCompHost.getServiceName(), + svcCompHost.getHostName()); + } + loadServiceHostComponents(); + + final String hostname = svcCompHost.getHostName(); + final String serviceName = svcCompHost.getServiceName(); + final String componentName = svcCompHost.getServiceComponentName(); + Set<Cluster> cs = clusters.getClustersForHost(hostname); + clusterGlobalLock.writeLock().lock(); try { - if (LOG.isDebugEnabled()) { - LOG.debug("Trying to remove ServiceComponentHost to ClusterHostMap cache" - + ", serviceName=" - + svcCompHost.getServiceName() - + ", componentName=" - + svcCompHost.getServiceComponentName() - + ", hostname=" + svcCompHost.getHostName()); - } - - final String hostname = svcCompHost.getHostName(); - final String serviceName = svcCompHost.getServiceName(); - final String componentName = svcCompHost.getServiceComponentName(); - Set<Cluster> cs = clusters.getClustersForHost(hostname); boolean clusterFound = false; Iterator<Cluster> iter = cs.iterator(); while (iter.hasNext()) { @@ -727,6 +729,7 @@ public class ClusterImpl implements Cluster { + ", serviceName=" + serviceName + ", serviceComponentName" + componentName + ", hostname= " + hostname); } + if (!serviceComponentHostsByHost.containsKey(hostname)) { throw new AmbariException("Invalid host entry for ServiceComponentHost" + ", serviceName=" + serviceName + ", serviceComponentName" @@ -1068,6 +1071,7 @@ public class ClusterImpl implements Cluster { RepositoryVersionState desiredState = sourceClusterVersion.getState(); + @SuppressWarnings("serial") Set<RepositoryVersionState> validStates = new HashSet<RepositoryVersionState>(){{ add(RepositoryVersionState.INSTALLING); }}; @@ -1076,14 +1080,17 @@ public class ClusterImpl implements Cluster { throw new AmbariException("The state must be one of " + validStates); } + Map<String, Host> hosts = clusters.getHostsForCluster(getClusterName()); + Set<String> existingHostsWithClusterStackAndVersion = new HashSet<String>(); + HashMap<String, HostVersionEntity> existingHostStackVersions = new HashMap<String, HostVersionEntity>(); + clusterGlobalLock.writeLock().lock(); try { - Set<String> existingHostsWithClusterStackAndVersion = new HashSet<String>(); - HashMap<String, HostVersionEntity> existingHostStackVersions = new HashMap<String, HostVersionEntity>(); List<HostVersionEntity> existingHostVersionEntities = hostVersionDAO.findByClusterStackAndVersion( getClusterName(), sourceClusterVersion.getRepositoryVersion().getStack(), sourceClusterVersion.getRepositoryVersion().getVersion()); + if (existingHostVersionEntities != null) { for (HostVersionEntity entity : existingHostVersionEntities) { existingHostsWithClusterStackAndVersion.add(entity.getHostName()); @@ -1091,8 +1098,6 @@ public class ClusterImpl implements Cluster { } } - Map<String, Host> hosts = clusters.getHostsForCluster(getClusterName()); - Sets.SetView<String> hostsMissingRepoVersion = Sets.difference( hosts.keySet(), existingHostsWithClusterStackAndVersion); @@ -1177,6 +1182,8 @@ public class ClusterImpl implements Cluster { return; } + Map<String, Host> hosts = clusters.getHostsForCluster(getClusterName()); + clusterGlobalLock.writeLock().lock(); try { // Part 1, bootstrap cluster version if necessary. @@ -1215,14 +1222,13 @@ public class ClusterImpl implements Cluster { } // Part 2, check for transitions. - Map<String, Host> hosts = clusters.getHostsForCluster(getClusterName()); - Set<Host> hostsWithoutHostVersion = new HashSet<Host>(); Map<RepositoryVersionState, Set<String>> stateToHosts = new HashMap<RepositoryVersionState, Set<String>>(); for (Host host : hosts.values()) { String hostName = host.getHostName(); HostVersionEntity hostVersion = hostVersionDAO.findByClusterStackVersionAndHost( getClusterName(), stackId.getStackId(), repositoryVersion, hostName); + if (hostVersion == null) { // This host either has not had a chance to heartbeat yet with its // installed component, or it has components @@ -1599,14 +1605,14 @@ public class ClusterImpl implements Cluster { @Override public ClusterResponse convertToResponse() throws AmbariException { + String clusterName = getClusterName(); + Map<String, Host> hosts = clusters.getHostsForCluster(clusterName); clusterGlobalLock.readLock().lock(); try { - Map<String, Host> hosts = clusters.getHostsForCluster(getClusterName()); - - return new ClusterResponse(getClusterId(), getClusterName(), + return new ClusterResponse(getClusterId(), clusterName, getProvisioningState(), getSecurityType(), hosts.keySet(), hosts.size(), getDesiredStackVersion().getStackId(), - getClusterHealthReport()); + getClusterHealthReport(hosts)); } finally { clusterGlobalLock.readLock().unlock(); } @@ -2356,7 +2362,8 @@ public class ClusterImpl implements Cluster { return components.get(componentName).getServiceComponentHosts().keySet(); } - private ClusterHealthReport getClusterHealthReport() throws AmbariException { + private ClusterHealthReport getClusterHealthReport( + Map<String, Host> clusterHosts) throws AmbariException { int staleConfigsHosts = 0; int maintenanceStateHosts = 0; @@ -2371,17 +2378,11 @@ public class ClusterImpl implements Cluster { int alertStatusHosts = 0; int heartbeatLostStateHosts = 0; - Set<String> hostnames; - - try { - hostnames = clusters.getHostsForCluster(clusterEntity.getClusterName()).keySet(); - } catch (AmbariException ignored) { - hostnames = Collections.emptySet(); - } - - for (String hostname : hostnames) { - - Host host = clusters.getHost(hostname); + Collection<Host> hosts = clusterHosts.values(); + Iterator<Host> iterator = hosts.iterator(); + while (iterator.hasNext()) { + Host host = iterator.next(); + String hostName = host.getHostName(); switch (host.getState()) { case HEALTHY: @@ -2416,8 +2417,8 @@ public class ClusterImpl implements Cluster { boolean staleConfig = false; boolean maintenanceState = false; - if (serviceComponentHostsByHost.containsKey(hostname)) { - for (ServiceComponentHost sch : serviceComponentHostsByHost.get(hostname)) { + if (serviceComponentHostsByHost.containsKey(hostName)) { + for (ServiceComponentHost sch : serviceComponentHostsByHost.get(hostName)) { staleConfig = staleConfig || configHelper.isStaleConfigs(sch); maintenanceState = maintenanceState || maintenanceStateHelper.getEffectiveState(sch) != MaintenanceState.OFF; http://git-wip-us.apache.org/repos/asf/ambari/blob/923bf81a/ambari-server/src/main/java/org/apache/ambari/server/state/cluster/ClustersImpl.java ---------------------------------------------------------------------- diff --git a/ambari-server/src/main/java/org/apache/ambari/server/state/cluster/ClustersImpl.java b/ambari-server/src/main/java/org/apache/ambari/server/state/cluster/ClustersImpl.java index 97ac2be..1c16ce3 100644 --- a/ambari-server/src/main/java/org/apache/ambari/server/state/cluster/ClustersImpl.java +++ b/ambari-server/src/main/java/org/apache/ambari/server/state/cluster/ClustersImpl.java @@ -28,6 +28,7 @@ import java.util.Map; import java.util.Set; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.locks.Lock; +import java.util.concurrent.locks.ReadWriteLock; import java.util.concurrent.locks.ReentrantReadWriteLock; import javax.persistence.RollbackException; @@ -67,7 +68,6 @@ import org.apache.ambari.server.state.HostHealthStatus; import org.apache.ambari.server.state.HostHealthStatus.HealthStatus; import org.apache.ambari.server.state.HostState; import org.apache.ambari.server.state.RepositoryInfo; -import org.apache.ambari.server.state.RepositoryVersionState; import org.apache.ambari.server.state.StackId; import org.apache.ambari.server.state.configgroup.ConfigGroup; import org.apache.ambari.server.state.host.HostFactory; @@ -75,7 +75,6 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.security.core.GrantedAuthority; -import com.google.common.collect.Sets; import com.google.gson.Gson; import com.google.inject.Inject; import com.google.inject.Singleton; @@ -97,7 +96,7 @@ public class ClustersImpl implements Clusters { private final Lock r = rwl.readLock(); private final Lock w = rwl.writeLock(); - volatile boolean clustersLoaded = false; + private volatile boolean clustersLoaded = false; @Inject ClusterDAO clusterDAO; @@ -141,22 +140,24 @@ public class ClustersImpl implements Clusters { LOG.info("Initializing the ClustersImpl"); } - void checkLoaded() { - if (!clustersLoaded) { - w.lock(); - try { - if (!clustersLoaded) { - loadClustersAndHosts(); - } - clustersLoaded = true; - } finally { - w.unlock(); + private void checkLoaded() { + if (clustersLoaded) { + return; + } + + w.lock(); + try { + if (!clustersLoaded) { + loadClustersAndHosts(); } + clustersLoaded = true; + } finally { + w.unlock(); } } @Transactional - void loadClustersAndHosts() { + private void loadClustersAndHosts() { for (ClusterEntity clusterEntity : clusterDAO.findAll()) { Cluster currentCluster = clusterFactory.create(clusterEntity); clusters.put(clusterEntity.getClusterName(), currentCluster); @@ -175,7 +176,6 @@ public class ClustersImpl implements Clusters { cSet.add(clusters.get(clusterEntity.getClusterName())); } } - } @Override @@ -183,11 +183,6 @@ public class ClustersImpl implements Clusters { throws AmbariException { checkLoaded(); - if (clusters.containsKey(clusterName)) { - throw new DuplicateResourceException("Attempted to create a Cluster which already exists" - + ", clusterName=" + clusterName); - } - w.lock(); try { if (clusters.containsKey(clusterName)) { @@ -203,6 +198,7 @@ public class ClustersImpl implements Clusters { resourceTypeEntity.setName(ResourceTypeEntity.CLUSTER_RESOURCE_TYPE_NAME); resourceTypeEntity = resourceTypeDAO.merge(resourceTypeEntity); } + ResourceEntity resourceEntity = new ResourceEntity(); resourceEntity.setResourceType(resourceTypeEntity); @@ -348,12 +344,13 @@ public class ClustersImpl implements Clusters { throw new AmbariException(duplicateMessage); } - r.lock(); + w.lock(); try { HostEntity hostEntity = new HostEntity(); hostEntity.setHostName(hostname); hostEntity.setClusterEntities(new ArrayList<ClusterEntity>()); + //not stored to DB Host host = hostFactory.create(hostEntity, false); host.setAgentVersion(new AgentVersion("")); @@ -370,7 +367,7 @@ public class ClustersImpl implements Clusters { + ", hostname=" + hostname); } } finally { - r.unlock(); + w.unlock(); } // publish the event @@ -386,11 +383,12 @@ public class ClustersImpl implements Clusters { } @Override - public void updateHostWithClusterAndAttributes(Map<String, Set<String>> hostClusters, Map<String, - Map<String, String>> hostAttributes) - throws AmbariException { + public void updateHostWithClusterAndAttributes( + Map<String, Set<String>> hostClusters, + Map<String, Map<String, String>> hostAttributes) throws AmbariException { checkLoaded(); w.lock(); + try { if (hostClusters != null) { Map<String, Host> hostMap = getHostsMap(hostClusters.keySet()); @@ -398,7 +396,6 @@ public class ClustersImpl implements Clusters { for (Set<String> cSet : hostClusters.values()) { clusterNames.addAll(cSet); } - Map<String, Cluster> clusterMap = getClustersMap(clusterNames); for (String hostname : hostClusters.keySet()) { Host host = hostMap.get(hostname); @@ -406,8 +403,8 @@ public class ClustersImpl implements Clusters { if (attributes != null && !attributes.isEmpty()){ host.setHostAttributes(attributes); } - host.refresh(); + host.refresh(); Set<String> hostClusterNames = hostClusters.get(hostname); for (String clusterName : hostClusterNames) { @@ -492,48 +489,61 @@ public class ClustersImpl implements Clusters { * @param currentClusterVersion Cluster's current stack version * @throws AmbariException May throw a DuplicateResourceException. */ - public void mapHostToCluster(String hostname, String clusterName, ClusterVersionEntity currentClusterVersion) throws AmbariException { + public void mapHostToCluster(String hostname, String clusterName, + ClusterVersionEntity currentClusterVersion) throws AmbariException { + Host host = null; + Cluster cluster = null; + checkLoaded(); - w.lock(); + r.lock(); try { - Host host = getHost(hostname); - Cluster cluster = getCluster(clusterName); + host = getHost(hostname); + cluster = getCluster(clusterName); + // check to ensure there are no duplicates for (Cluster c : hostClusterMap.get(hostname)) { if (c.getClusterName().equals(clusterName)) { throw new DuplicateResourceException("Attempted to create a host which already exists: clusterName=" + clusterName + ", hostName=" + hostname); } } + } finally { + r.unlock(); + } - if (!isOsSupportedByClusterStack(cluster, host)) { - String message = "Trying to map host to cluster where stack does not" - + " support host's os type" - + ", clusterName=" + clusterName - + ", clusterStackId=" + cluster.getDesiredStackVersion().getStackId() - + ", hostname=" + hostname - + ", hostOsFamily=" + host.getOsFamily(); - LOG.warn(message); - throw new AmbariException(message); - } + if (!isOsSupportedByClusterStack(cluster, host)) { + String message = "Trying to map host to cluster where stack does not" + + " support host's os type" + ", clusterName=" + clusterName + + ", clusterStackId=" + cluster.getDesiredStackVersion().getStackId() + + ", hostname=" + hostname + ", hostOsFamily=" + host.getOsFamily(); + LOG.warn(message); + throw new AmbariException(message); + } - mapHostClusterEntities(hostname, cluster.getClusterId()); + long clusterId = cluster.getClusterId(); + if (LOG.isDebugEnabled()) { + LOG.debug("Mapping host {} to cluster {} (id={})", hostname, clusterName, + clusterId); + } - host.refresh(); - cluster.refresh(); + w.lock(); + try { + mapHostClusterEntities(hostname, clusterId); hostClusterMap.get(hostname).add(cluster); clusterHostMap.get(clusterName).add(host); - - if (LOG.isDebugEnabled()) { - LOG.debug("Mapping a host to a cluster" - + ", clusterName=" + clusterName - + ", clusterId=" + cluster.getClusterId() - + ", hostname=" + hostname); - } } finally { w.unlock(); } + + ReadWriteLock clusterLock = cluster.getClusterGlobalLock(); + clusterLock.writeLock().lock(); + try { + host.refresh(); + cluster.refresh(); + } finally { + clusterLock.writeLock().unlock(); + } } /** @@ -544,8 +554,8 @@ public class ClustersImpl implements Clusters { * @throws AmbariException May throw a DuplicateResourceException. */ @Override - public void mapHostToCluster(String hostname, - String clusterName) throws AmbariException { + public void mapHostToCluster(String hostname, String clusterName) + throws AmbariException { checkLoaded(); ClusterVersionEntity clusterVersionEntity = clusterVersionDAO.findByClusterAndStateCurrent(clusterName); @@ -616,9 +626,8 @@ public class ClustersImpl implements Clusters { @Override public Map<String, Host> getHostsForCluster(String clusterName) throws AmbariException { - if (!clustersLoaded) { - checkLoaded(); - } + + checkLoaded(); r.lock(); try { @@ -668,28 +677,33 @@ public class ClustersImpl implements Clusters { @Override public void unmapHostFromCluster(String hostname, String clusterName) throws AmbariException { + Host host = null; + Cluster cluster = null; checkLoaded(); - w.lock(); - + r.lock(); try { - Host host = getHost(hostname); - Cluster cluster = getCluster(clusterName); + host = getHost(hostname); + cluster = getCluster(clusterName); + } finally { + r.unlock(); + } - if (LOG.isDebugEnabled()) { - LOG.debug("Unmapping a host from a cluster" - + ", clusterName=" + clusterName - + ", clusterId=" + cluster.getClusterId() - + ", hostname=" + hostname); - } + long clusterId = cluster.getClusterId(); + if (LOG.isDebugEnabled()) { + LOG.debug("Unmapping host {} from cluster {} (id={})", hostname, + clusterName, clusterId); + } + + w.lock(); + try { unmapHostClusterEntities(hostname, cluster.getClusterId()); - host.refresh(); - cluster.refresh(); hostClusterMap.get(hostname).remove(cluster); clusterHostMap.get(clusterName).remove(host); + host.refresh(); cluster.refresh(); @@ -700,7 +714,6 @@ public class ClustersImpl implements Clusters { } finally { w.unlock(); } - } @Transactional http://git-wip-us.apache.org/repos/asf/ambari/blob/923bf81a/ambari-server/src/test/java/org/apache/ambari/server/state/cluster/ClustersDeadlockTest.java ---------------------------------------------------------------------- diff --git a/ambari-server/src/test/java/org/apache/ambari/server/state/cluster/ClustersDeadlockTest.java b/ambari-server/src/test/java/org/apache/ambari/server/state/cluster/ClustersDeadlockTest.java new file mode 100644 index 0000000..3a2773f --- /dev/null +++ b/ambari-server/src/test/java/org/apache/ambari/server/state/cluster/ClustersDeadlockTest.java @@ -0,0 +1,239 @@ +/** +* Licensed to the Apache Software Foundation (ASF) under one +* or more contributor license agreements. See the NOTICE file +* distributed with this work for additional information +* regarding copyright ownership. The ASF licenses this file +* to you under the Apache License, Version 2.0 (the +* "License"); you may not use this file except in compliance +* with the License. You may obtain a copy of the License at +* +* http://www.apache.org/licenses/LICENSE-2.0 +* +* Unless required by applicable law or agreed to in writing, software +* distributed under the License is distributed on an "AS IS" BASIS, +* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +* See the License for the specific language governing permissions and +* limitations under the License. +*/ + +package org.apache.ambari.server.state.cluster; + +import java.util.ArrayList; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.concurrent.atomic.AtomicInteger; + +import junit.framework.Assert; + +import org.apache.ambari.server.api.services.AmbariMetaInfo; +import org.apache.ambari.server.orm.GuiceJpaInitializer; +import org.apache.ambari.server.orm.InMemoryDefaultTestModule; +import org.apache.ambari.server.orm.OrmTestHelper; +import org.apache.ambari.server.state.Cluster; +import org.apache.ambari.server.state.Clusters; +import org.apache.ambari.server.state.Host; +import org.apache.ambari.server.state.RepositoryVersionState; +import org.apache.ambari.server.state.StackId; +import org.junit.After; +import org.junit.Before; +import org.junit.Test; + +import com.google.inject.Guice; +import com.google.inject.Inject; +import com.google.inject.Injector; +import com.google.inject.persist.PersistService; + +/** + * Tests AMBARI-9738 which produced a deadlock during read and writes between + * {@link ClustersImpl} and {@link ClusterImpl}. + */ +public class ClustersDeadlockTest { + private static final String CLUSTER_NAME = "c1"; + private static final int NUMBER_OF_HOSTS = 100; + private static final int NUMBER_OF_THREADS = 3; + + private final AtomicInteger hostNameCounter = new AtomicInteger(0); + + @Inject + private Injector injector; + + @Inject + private Clusters clusters; + + @Inject + private AmbariMetaInfo metaInfo; + + @Inject + private OrmTestHelper helper; + + private Cluster cluster; + + @Before + public void setup() throws Exception { + injector = Guice.createInjector(new InMemoryDefaultTestModule()); + injector.getInstance(GuiceJpaInitializer.class); + injector.injectMembers(this); + clusters.addCluster(CLUSTER_NAME); + + StackId stackId = new StackId("HDP-0.1"); + cluster = clusters.getCluster(CLUSTER_NAME); + cluster.setDesiredStackVersion(stackId); + helper.getOrCreateRepositoryVersion(stackId.getStackName(), stackId.getStackVersion()); + cluster.createClusterVersion(stackId.getStackName(), stackId.getStackVersion(), "admin", RepositoryVersionState.UPGRADING); + metaInfo.init(); + } + + @After + public void teardown() { + injector.getInstance(PersistService.class).stop(); + } + + /** + * Tests that no deadlock exists when adding hosts from reading from the + * cluster. + * + * @throws Exception + */ + @Test(timeout = 35000) + public void testDeadlockWhileMappingHosts() throws Exception { + List<Thread> threads = new ArrayList<Thread>(); + for (int i = 0; i < NUMBER_OF_THREADS; i++) { + ClusterReaderThread readerThread = new ClusterReaderThread(); + ClustersHostMapperThread writerThread = new ClustersHostMapperThread(); + + threads.add(readerThread); + threads.add(writerThread); + + readerThread.start(); + writerThread.start(); + } + + for (Thread thread : threads) { + thread.join(); + } + + Assert.assertEquals(NUMBER_OF_THREADS * NUMBER_OF_HOSTS, + clusters.getHostsForCluster(CLUSTER_NAME).size()); + } + + /** + * Tests that no deadlock exists when adding hosts from reading from the + * cluster. + * + * @throws Exception + */ + @Test(timeout = 35000) + public void testDeadlockWhileUnmappingHosts() throws Exception { + List<Thread> threads = new ArrayList<Thread>(); + for (int i = 0; i < NUMBER_OF_THREADS; i++) { + ClusterReaderThread readerThread = new ClusterReaderThread(); + ClustersHostUnMapperThread writerThread = new ClustersHostUnMapperThread(); + + threads.add(readerThread); + threads.add(writerThread); + + readerThread.start(); + writerThread.start(); + } + + for (Thread thread : threads) { + thread.join(); + } + + Assert.assertEquals(0, + clusters.getHostsForCluster(CLUSTER_NAME).size()); + } + + /** + * The {@link ClusterReaderThread} reads from a cluster over and over again + * with a slight pause. + */ + private final class ClusterReaderThread extends Thread { + + /** + * {@inheritDoc} + */ + @Override + public void run() { + try { + for (int i = 0; i < 1000; i++) { + cluster.convertToResponse(); + Thread.sleep(10); + } + } catch (Exception exception) { + throw new RuntimeException(exception); + } + } + } + + /** + * The {@link ClustersHostMapperThread} is used to map hosts to a cluster over + * and over. + */ + private final class ClustersHostMapperThread extends Thread { + + /** + * {@inheritDoc} + */ + @Override + public void run() { + try { + for (int i = 0; i < NUMBER_OF_HOSTS; i++) { + String hostName = "c64-" + hostNameCounter.getAndIncrement(); + clusters.addHost(hostName); + setOsFamily(clusters.getHost(hostName), "redhat", "6.4"); + clusters.getHost(hostName).persist(); + clusters.mapHostToCluster(hostName, CLUSTER_NAME); + + Thread.sleep(10); + } + } catch (Exception exception) { + throw new RuntimeException(exception); + } + } + } + + /** + * The {@link ClustersHostUnMapperThread} is used to unmap hosts to a cluster + * over and over. + */ + private final class ClustersHostUnMapperThread extends Thread { + + /** + * {@inheritDoc} + */ + @Override + public void run() { + List<String> hostNames = new ArrayList<String>(100); + try { + // pre-map the hosts + for (int i = 0; i < NUMBER_OF_HOSTS; i++) { + String hostName = "c64-" + hostNameCounter.getAndIncrement(); + hostNames.add(hostName); + + clusters.addHost(hostName); + setOsFamily(clusters.getHost(hostName), "redhat", "6.4"); + clusters.getHost(hostName).persist(); + clusters.mapHostToCluster(hostName, CLUSTER_NAME); + } + + // unmap them all now + for (String hostName : hostNames) { + clusters.unmapHostFromCluster(hostName, CLUSTER_NAME); + Thread.sleep(10); + } + } catch (Exception exception) { + throw new RuntimeException(exception); + } + } + } + + + private void setOsFamily(Host host, String osFamily, String osVersion) { + Map<String, String> hostAttributes = new HashMap<String, String>(2); + hostAttributes.put("os_family", osFamily); + hostAttributes.put("os_release_version", osVersion); + host.setHostAttributes(hostAttributes); + } +}