Repository: ambari
Updated Branches:
  refs/heads/trunk 913bf4ed6 -> 923bf81ae


AMBARI-9738 - Ambari Server Deadlocks During Cluster Provisioning 
(jonathanhurley)


Project: http://git-wip-us.apache.org/repos/asf/ambari/repo
Commit: http://git-wip-us.apache.org/repos/asf/ambari/commit/923bf81a
Tree: http://git-wip-us.apache.org/repos/asf/ambari/tree/923bf81a
Diff: http://git-wip-us.apache.org/repos/asf/ambari/diff/923bf81a

Branch: refs/heads/trunk
Commit: 923bf81ae125ac6581cef053aa2c31baca1dcb65
Parents: 913bf4e
Author: Jonathan Hurley <jhur...@hortonworks.com>
Authored: Sat Feb 21 16:55:24 2015 -0500
Committer: Jonathan Hurley <jhur...@hortonworks.com>
Committed: Mon Feb 23 10:14:29 2015 -0500

----------------------------------------------------------------------
 .../server/state/cluster/ClusterImpl.java       | 105 ++++----
 .../server/state/cluster/ClustersImpl.java      | 151 ++++++------
 .../state/cluster/ClustersDeadlockTest.java     | 239 +++++++++++++++++++
 3 files changed, 374 insertions(+), 121 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/ambari/blob/923bf81a/ambari-server/src/main/java/org/apache/ambari/server/state/cluster/ClusterImpl.java
----------------------------------------------------------------------
diff --git 
a/ambari-server/src/main/java/org/apache/ambari/server/state/cluster/ClusterImpl.java
 
b/ambari-server/src/main/java/org/apache/ambari/server/state/cluster/ClusterImpl.java
index 8589182..c609b0a 100644
--- 
a/ambari-server/src/main/java/org/apache/ambari/server/state/cluster/ClusterImpl.java
+++ 
b/ambari-server/src/main/java/org/apache/ambari/server/state/cluster/ClusterImpl.java
@@ -92,9 +92,7 @@ import org.apache.ambari.server.state.ConfigVersionHelper;
 import org.apache.ambari.server.state.DesiredConfig;
 import org.apache.ambari.server.state.Host;
 import org.apache.ambari.server.state.HostHealthStatus;
-import org.apache.ambari.server.state.HostState;
 import org.apache.ambari.server.state.MaintenanceState;
-import org.apache.ambari.server.state.PropertyInfo;
 import org.apache.ambari.server.state.RepositoryVersionState;
 import org.apache.ambari.server.state.SecurityType;
 import org.apache.ambari.server.state.Service;
@@ -616,23 +614,25 @@ public class ClusterImpl implements Cluster {
     }
   }
 
-  public void addServiceComponentHost(
-    ServiceComponentHost svcCompHost) throws AmbariException {
+  public void addServiceComponentHost(ServiceComponentHost svcCompHost)
+      throws AmbariException {
+    if (LOG.isDebugEnabled()) {
+      LOG.debug("Trying to add component {} of service {} on {} to the cache",
+          svcCompHost.getServiceComponentName(), svcCompHost.getServiceName(),
+          svcCompHost.getHostName());
+    }
+
     loadServiceHostComponents();
+
+    final String hostname = svcCompHost.getHostName();
+    final String serviceName = svcCompHost.getServiceName();
+    final String componentName = svcCompHost.getServiceComponentName();
+
+    Set<Cluster> cs = clusters.getClustersForHost(hostname);
+
     clusterGlobalLock.writeLock().lock();
 
     try {
-      if (LOG.isDebugEnabled()) {
-        LOG.debug("Trying to add ServiceComponentHost to ClusterHostMap cache"
-            + ", serviceName=" + svcCompHost.getServiceName()
-            + ", componentName=" + svcCompHost.getServiceComponentName()
-            + ", hostname=" + svcCompHost.getHostName());
-      }
-
-      final String hostname = svcCompHost.getHostName();
-      final String serviceName = svcCompHost.getServiceName();
-      final String componentName = svcCompHost.getServiceComponentName();
-      Set<Cluster> cs = clusters.getClustersForHost(hostname);
       boolean clusterFound = false;
       Iterator<Cluster> iter = cs.iterator();
       while (iter.hasNext()) {
@@ -642,6 +642,7 @@ public class ClusterImpl implements Cluster {
           break;
         }
       }
+
       if (!clusterFound) {
         throw new AmbariException("Host does not belong this cluster"
             + ", hostname=" + hostname + ", clusterName=" + getClusterName()
@@ -652,6 +653,7 @@ public class ClusterImpl implements Cluster {
         serviceComponentHosts.put(serviceName,
             new HashMap<String, Map<String, ServiceComponentHost>>());
       }
+
       if (!serviceComponentHosts.get(serviceName).containsKey(componentName)) {
         serviceComponentHosts.get(serviceName).put(componentName,
             new HashMap<String, ServiceComponentHost>());
@@ -687,22 +689,22 @@ public class ClusterImpl implements Cluster {
   @Override
   public void removeServiceComponentHost(ServiceComponentHost svcCompHost)
     throws AmbariException {
+    if (LOG.isDebugEnabled()) {
+      LOG.debug(
+          "Trying to remove component {} of service {} on {} from the cache",
+          svcCompHost.getServiceComponentName(), svcCompHost.getServiceName(),
+          svcCompHost.getHostName());
+    }
+
     loadServiceHostComponents();
+
+    final String hostname = svcCompHost.getHostName();
+    final String serviceName = svcCompHost.getServiceName();
+    final String componentName = svcCompHost.getServiceComponentName();
+    Set<Cluster> cs = clusters.getClustersForHost(hostname);
+
     clusterGlobalLock.writeLock().lock();
     try {
-      if (LOG.isDebugEnabled()) {
-        LOG.debug("Trying to remove ServiceComponentHost to ClusterHostMap 
cache"
-            + ", serviceName="
-            + svcCompHost.getServiceName()
-            + ", componentName="
-            + svcCompHost.getServiceComponentName()
-            + ", hostname=" + svcCompHost.getHostName());
-      }
-
-      final String hostname = svcCompHost.getHostName();
-      final String serviceName = svcCompHost.getServiceName();
-      final String componentName = svcCompHost.getServiceComponentName();
-      Set<Cluster> cs = clusters.getClustersForHost(hostname);
       boolean clusterFound = false;
       Iterator<Cluster> iter = cs.iterator();
       while (iter.hasNext()) {
@@ -727,6 +729,7 @@ public class ClusterImpl implements Cluster {
             + ", serviceName=" + serviceName + ", serviceComponentName"
             + componentName + ", hostname= " + hostname);
       }
+
       if (!serviceComponentHostsByHost.containsKey(hostname)) {
         throw new AmbariException("Invalid host entry for ServiceComponentHost"
             + ", serviceName=" + serviceName + ", serviceComponentName"
@@ -1068,6 +1071,7 @@ public class ClusterImpl implements Cluster {
 
     RepositoryVersionState desiredState = sourceClusterVersion.getState();
 
+    @SuppressWarnings("serial")
     Set<RepositoryVersionState> validStates = new 
HashSet<RepositoryVersionState>(){{
       add(RepositoryVersionState.INSTALLING);
     }};
@@ -1076,14 +1080,17 @@ public class ClusterImpl implements Cluster {
       throw new AmbariException("The state must be one of " + validStates);
     }
 
+    Map<String, Host> hosts = clusters.getHostsForCluster(getClusterName());
+    Set<String> existingHostsWithClusterStackAndVersion = new 
HashSet<String>();
+    HashMap<String, HostVersionEntity> existingHostStackVersions = new 
HashMap<String, HostVersionEntity>();
+
     clusterGlobalLock.writeLock().lock();
     try {
-      Set<String> existingHostsWithClusterStackAndVersion = new 
HashSet<String>();
-      HashMap<String, HostVersionEntity> existingHostStackVersions = new 
HashMap<String, HostVersionEntity>();
       List<HostVersionEntity> existingHostVersionEntities = 
hostVersionDAO.findByClusterStackAndVersion(
           getClusterName(),
           sourceClusterVersion.getRepositoryVersion().getStack(),
           sourceClusterVersion.getRepositoryVersion().getVersion());
+
       if (existingHostVersionEntities != null) {
         for (HostVersionEntity entity : existingHostVersionEntities) {
           existingHostsWithClusterStackAndVersion.add(entity.getHostName());
@@ -1091,8 +1098,6 @@ public class ClusterImpl implements Cluster {
         }
       }
 
-      Map<String, Host> hosts = clusters.getHostsForCluster(getClusterName());
-
       Sets.SetView<String> hostsMissingRepoVersion = Sets.difference(
           hosts.keySet(), existingHostsWithClusterStackAndVersion);
 
@@ -1177,6 +1182,8 @@ public class ClusterImpl implements Cluster {
       return;
     }
 
+    Map<String, Host> hosts = clusters.getHostsForCluster(getClusterName());
+
     clusterGlobalLock.writeLock().lock();
     try {
       // Part 1, bootstrap cluster version if necessary.
@@ -1215,14 +1222,13 @@ public class ClusterImpl implements Cluster {
       }
 
       // Part 2, check for transitions.
-      Map<String, Host> hosts = clusters.getHostsForCluster(getClusterName());
-
       Set<Host> hostsWithoutHostVersion = new HashSet<Host>();
       Map<RepositoryVersionState, Set<String>> stateToHosts = new 
HashMap<RepositoryVersionState, Set<String>>();
       for (Host host : hosts.values()) {
         String hostName = host.getHostName();
         HostVersionEntity hostVersion = 
hostVersionDAO.findByClusterStackVersionAndHost(
             getClusterName(), stackId.getStackId(), repositoryVersion, 
hostName);
+
         if (hostVersion == null) {
           // This host either has not had a chance to heartbeat yet with its
           // installed component, or it has components
@@ -1599,14 +1605,14 @@ public class ClusterImpl implements Cluster {
   @Override
   public ClusterResponse convertToResponse()
     throws AmbariException {
+    String clusterName = getClusterName();
+    Map<String, Host> hosts = clusters.getHostsForCluster(clusterName);
     clusterGlobalLock.readLock().lock();
     try {
-      Map<String, Host> hosts = clusters.getHostsForCluster(getClusterName());
-
-      return new ClusterResponse(getClusterId(), getClusterName(),
+      return new ClusterResponse(getClusterId(), clusterName,
           getProvisioningState(), getSecurityType(), hosts.keySet(),
           hosts.size(), getDesiredStackVersion().getStackId(),
-          getClusterHealthReport());
+          getClusterHealthReport(hosts));
     } finally {
       clusterGlobalLock.readLock().unlock();
     }
@@ -2356,7 +2362,8 @@ public class ClusterImpl implements Cluster {
     return components.get(componentName).getServiceComponentHosts().keySet();
   }
 
-  private ClusterHealthReport getClusterHealthReport() throws AmbariException {
+  private ClusterHealthReport getClusterHealthReport(
+      Map<String, Host> clusterHosts) throws AmbariException {
 
     int staleConfigsHosts = 0;
     int maintenanceStateHosts = 0;
@@ -2371,17 +2378,11 @@ public class ClusterImpl implements Cluster {
     int alertStatusHosts = 0;
     int heartbeatLostStateHosts = 0;
 
-    Set<String> hostnames;
-
-    try {
-      hostnames = 
clusters.getHostsForCluster(clusterEntity.getClusterName()).keySet();
-    } catch (AmbariException ignored) {
-      hostnames = Collections.emptySet();
-    }
-
-    for (String hostname : hostnames) {
-
-      Host host = clusters.getHost(hostname);
+    Collection<Host> hosts = clusterHosts.values();
+    Iterator<Host> iterator = hosts.iterator();
+    while (iterator.hasNext()) {
+      Host host = iterator.next();
+      String hostName = host.getHostName();
 
       switch (host.getState()) {
         case HEALTHY:
@@ -2416,8 +2417,8 @@ public class ClusterImpl implements Cluster {
       boolean staleConfig = false;
       boolean maintenanceState = false;
 
-      if (serviceComponentHostsByHost.containsKey(hostname)) {
-        for (ServiceComponentHost sch : 
serviceComponentHostsByHost.get(hostname)) {
+      if (serviceComponentHostsByHost.containsKey(hostName)) {
+        for (ServiceComponentHost sch : 
serviceComponentHostsByHost.get(hostName)) {
           staleConfig = staleConfig || configHelper.isStaleConfigs(sch);
           maintenanceState = maintenanceState ||
             maintenanceStateHelper.getEffectiveState(sch) != 
MaintenanceState.OFF;

http://git-wip-us.apache.org/repos/asf/ambari/blob/923bf81a/ambari-server/src/main/java/org/apache/ambari/server/state/cluster/ClustersImpl.java
----------------------------------------------------------------------
diff --git 
a/ambari-server/src/main/java/org/apache/ambari/server/state/cluster/ClustersImpl.java
 
b/ambari-server/src/main/java/org/apache/ambari/server/state/cluster/ClustersImpl.java
index 97ac2be..1c16ce3 100644
--- 
a/ambari-server/src/main/java/org/apache/ambari/server/state/cluster/ClustersImpl.java
+++ 
b/ambari-server/src/main/java/org/apache/ambari/server/state/cluster/ClustersImpl.java
@@ -28,6 +28,7 @@ import java.util.Map;
 import java.util.Set;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.locks.Lock;
+import java.util.concurrent.locks.ReadWriteLock;
 import java.util.concurrent.locks.ReentrantReadWriteLock;
 
 import javax.persistence.RollbackException;
@@ -67,7 +68,6 @@ import org.apache.ambari.server.state.HostHealthStatus;
 import org.apache.ambari.server.state.HostHealthStatus.HealthStatus;
 import org.apache.ambari.server.state.HostState;
 import org.apache.ambari.server.state.RepositoryInfo;
-import org.apache.ambari.server.state.RepositoryVersionState;
 import org.apache.ambari.server.state.StackId;
 import org.apache.ambari.server.state.configgroup.ConfigGroup;
 import org.apache.ambari.server.state.host.HostFactory;
@@ -75,7 +75,6 @@ import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 import org.springframework.security.core.GrantedAuthority;
 
-import com.google.common.collect.Sets;
 import com.google.gson.Gson;
 import com.google.inject.Inject;
 import com.google.inject.Singleton;
@@ -97,7 +96,7 @@ public class ClustersImpl implements Clusters {
   private final Lock r = rwl.readLock();
   private final Lock w = rwl.writeLock();
 
-  volatile boolean clustersLoaded = false;
+  private volatile boolean clustersLoaded = false;
 
   @Inject
   ClusterDAO clusterDAO;
@@ -141,22 +140,24 @@ public class ClustersImpl implements Clusters {
     LOG.info("Initializing the ClustersImpl");
   }
 
-  void  checkLoaded() {
-    if (!clustersLoaded) {
-      w.lock();
-      try {
-        if (!clustersLoaded) {
-          loadClustersAndHosts();
-        }
-        clustersLoaded = true;
-      } finally {
-        w.unlock();
+  private void checkLoaded() {
+    if (clustersLoaded) {
+      return;
+    }
+
+    w.lock();
+    try {
+      if (!clustersLoaded) {
+        loadClustersAndHosts();
       }
+      clustersLoaded = true;
+    } finally {
+      w.unlock();
     }
   }
 
   @Transactional
-  void loadClustersAndHosts() {
+  private void loadClustersAndHosts() {
     for (ClusterEntity clusterEntity : clusterDAO.findAll()) {
       Cluster currentCluster = clusterFactory.create(clusterEntity);
       clusters.put(clusterEntity.getClusterName(), currentCluster);
@@ -175,7 +176,6 @@ public class ClustersImpl implements Clusters {
         cSet.add(clusters.get(clusterEntity.getClusterName()));
       }
     }
-
   }
 
   @Override
@@ -183,11 +183,6 @@ public class ClustersImpl implements Clusters {
       throws AmbariException {
     checkLoaded();
 
-    if (clusters.containsKey(clusterName)) {
-      throw new DuplicateResourceException("Attempted to create a Cluster 
which already exists"
-          + ", clusterName=" + clusterName);
-    }
-
     w.lock();
     try {
       if (clusters.containsKey(clusterName)) {
@@ -203,6 +198,7 @@ public class ClustersImpl implements Clusters {
         
resourceTypeEntity.setName(ResourceTypeEntity.CLUSTER_RESOURCE_TYPE_NAME);
         resourceTypeEntity = resourceTypeDAO.merge(resourceTypeEntity);
       }
+
       ResourceEntity resourceEntity = new ResourceEntity();
       resourceEntity.setResourceType(resourceTypeEntity);
 
@@ -348,12 +344,13 @@ public class ClustersImpl implements Clusters {
       throw new AmbariException(duplicateMessage);
     }
 
-    r.lock();
+    w.lock();
 
     try {
       HostEntity hostEntity = new HostEntity();
       hostEntity.setHostName(hostname);
       hostEntity.setClusterEntities(new ArrayList<ClusterEntity>());
+
       //not stored to DB
       Host host = hostFactory.create(hostEntity, false);
       host.setAgentVersion(new AgentVersion(""));
@@ -370,7 +367,7 @@ public class ClustersImpl implements Clusters {
             + ", hostname=" + hostname);
       }
     } finally {
-      r.unlock();
+      w.unlock();
     }
 
     // publish the event
@@ -386,11 +383,12 @@ public class ClustersImpl implements Clusters {
   }
 
   @Override
-  public void updateHostWithClusterAndAttributes(Map<String, Set<String>> 
hostClusters, Map<String,
-      Map<String, String>> hostAttributes)
-      throws AmbariException {
+  public void updateHostWithClusterAndAttributes(
+      Map<String, Set<String>> hostClusters,
+      Map<String, Map<String, String>> hostAttributes) throws AmbariException {
     checkLoaded();
     w.lock();
+
     try {
       if (hostClusters != null) {
         Map<String, Host> hostMap = getHostsMap(hostClusters.keySet());
@@ -398,7 +396,6 @@ public class ClustersImpl implements Clusters {
         for (Set<String> cSet : hostClusters.values()) {
           clusterNames.addAll(cSet);
         }
-        Map<String, Cluster> clusterMap = getClustersMap(clusterNames);
 
         for (String hostname : hostClusters.keySet()) {
           Host host = hostMap.get(hostname);
@@ -406,8 +403,8 @@ public class ClustersImpl implements Clusters {
           if (attributes != null && !attributes.isEmpty()){
             host.setHostAttributes(attributes);
           }
-          host.refresh();
 
+          host.refresh();
 
           Set<String> hostClusterNames = hostClusters.get(hostname);
           for (String clusterName : hostClusterNames) {
@@ -492,48 +489,61 @@ public class ClustersImpl implements Clusters {
    * @param currentClusterVersion Cluster's current stack version
    * @throws AmbariException May throw a DuplicateResourceException.
    */
-  public void mapHostToCluster(String hostname, String clusterName, 
ClusterVersionEntity currentClusterVersion) throws AmbariException {
+  public void mapHostToCluster(String hostname, String clusterName,
+      ClusterVersionEntity currentClusterVersion) throws AmbariException {
+    Host host = null;
+    Cluster cluster = null;
+
     checkLoaded();
-    w.lock();
 
+    r.lock();
     try {
-      Host host = getHost(hostname);
-      Cluster cluster = getCluster(clusterName);
+      host = getHost(hostname);
+      cluster = getCluster(clusterName);
 
+      // check to ensure there are no duplicates
       for (Cluster c : hostClusterMap.get(hostname)) {
         if (c.getClusterName().equals(clusterName)) {
           throw new DuplicateResourceException("Attempted to create a host 
which already exists: clusterName=" +
               clusterName + ", hostName=" + hostname);
         }
       }
+    } finally {
+      r.unlock();
+    }
 
-      if (!isOsSupportedByClusterStack(cluster, host)) {
-        String message = "Trying to map host to cluster where stack does not"
-            + " support host's os type"
-            + ", clusterName=" + clusterName
-            + ", clusterStackId=" + 
cluster.getDesiredStackVersion().getStackId()
-            + ", hostname=" + hostname
-            + ", hostOsFamily=" + host.getOsFamily();
-        LOG.warn(message);
-        throw new AmbariException(message);
-      }
+    if (!isOsSupportedByClusterStack(cluster, host)) {
+      String message = "Trying to map host to cluster where stack does not"
+          + " support host's os type" + ", clusterName=" + clusterName
+          + ", clusterStackId=" + cluster.getDesiredStackVersion().getStackId()
+          + ", hostname=" + hostname + ", hostOsFamily=" + host.getOsFamily();
+      LOG.warn(message);
+      throw new AmbariException(message);
+    }
 
-      mapHostClusterEntities(hostname, cluster.getClusterId());
+    long clusterId = cluster.getClusterId();
+    if (LOG.isDebugEnabled()) {
+      LOG.debug("Mapping host {} to cluster {} (id={})", hostname, clusterName,
+          clusterId);
+    }
 
-      host.refresh();
-      cluster.refresh();
+    w.lock();
+    try {
+      mapHostClusterEntities(hostname, clusterId);
       hostClusterMap.get(hostname).add(cluster);
       clusterHostMap.get(clusterName).add(host);
-
-      if (LOG.isDebugEnabled()) {
-        LOG.debug("Mapping a host to a cluster"
-            + ", clusterName=" + clusterName
-            + ", clusterId=" + cluster.getClusterId()
-            + ", hostname=" + hostname);
-      }
     } finally {
       w.unlock();
     }
+
+    ReadWriteLock clusterLock = cluster.getClusterGlobalLock();
+    clusterLock.writeLock().lock();
+    try {
+      host.refresh();
+      cluster.refresh();
+    } finally {
+      clusterLock.writeLock().unlock();
+    }
   }
 
   /**
@@ -544,8 +554,8 @@ public class ClustersImpl implements Clusters {
    * @throws AmbariException May throw a DuplicateResourceException.
    */
   @Override
-  public void mapHostToCluster(String hostname,
-                               String clusterName) throws AmbariException {
+  public void mapHostToCluster(String hostname, String clusterName)
+      throws AmbariException {
     checkLoaded();
 
     ClusterVersionEntity clusterVersionEntity = 
clusterVersionDAO.findByClusterAndStateCurrent(clusterName);
@@ -616,9 +626,8 @@ public class ClustersImpl implements Clusters {
   @Override
   public Map<String, Host> getHostsForCluster(String clusterName)
       throws AmbariException {
-    if (!clustersLoaded) {
-      checkLoaded();
-    }
+
+    checkLoaded();
     r.lock();
 
     try {
@@ -668,28 +677,33 @@ public class ClustersImpl implements Clusters {
   @Override
   public void unmapHostFromCluster(String hostname, String clusterName)
       throws AmbariException {
+    Host host = null;
+    Cluster cluster = null;
 
     checkLoaded();
 
-    w.lock();
-
+    r.lock();
     try {
-      Host host = getHost(hostname);
-      Cluster cluster = getCluster(clusterName);
+      host = getHost(hostname);
+      cluster = getCluster(clusterName);
+    } finally {
+      r.unlock();
+    }
 
-      if (LOG.isDebugEnabled()) {
-        LOG.debug("Unmapping a host from a cluster"
-            + ", clusterName=" + clusterName
-            + ", clusterId=" + cluster.getClusterId()
-            + ", hostname=" + hostname);
-      }
+    long clusterId = cluster.getClusterId();
+    if (LOG.isDebugEnabled()) {
+      LOG.debug("Unmapping host {} from cluster {} (id={})", hostname,
+          clusterName, clusterId);
+    }
+
+    w.lock();
 
+    try {
       unmapHostClusterEntities(hostname, cluster.getClusterId());
-      host.refresh();
-      cluster.refresh();
 
       hostClusterMap.get(hostname).remove(cluster);
       clusterHostMap.get(clusterName).remove(host);
+
       host.refresh();
       cluster.refresh();
 
@@ -700,7 +714,6 @@ public class ClustersImpl implements Clusters {
     } finally {
       w.unlock();
     }
-
   }
 
   @Transactional

http://git-wip-us.apache.org/repos/asf/ambari/blob/923bf81a/ambari-server/src/test/java/org/apache/ambari/server/state/cluster/ClustersDeadlockTest.java
----------------------------------------------------------------------
diff --git 
a/ambari-server/src/test/java/org/apache/ambari/server/state/cluster/ClustersDeadlockTest.java
 
b/ambari-server/src/test/java/org/apache/ambari/server/state/cluster/ClustersDeadlockTest.java
new file mode 100644
index 0000000..3a2773f
--- /dev/null
+++ 
b/ambari-server/src/test/java/org/apache/ambari/server/state/cluster/ClustersDeadlockTest.java
@@ -0,0 +1,239 @@
+/**
+* Licensed to the Apache Software Foundation (ASF) under one
+* or more contributor license agreements.  See the NOTICE file
+* distributed with this work for additional information
+* regarding copyright ownership.  The ASF licenses this file
+* to you under the Apache License, Version 2.0 (the
+* "License"); you may not use this file except in compliance
+* with the License.  You may obtain a copy of the License at
+*
+*     http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing, software
+* distributed under the License is distributed on an "AS IS" BASIS,
+* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+* See the License for the specific language governing permissions and
+* limitations under the License.
+*/
+
+package org.apache.ambari.server.state.cluster;
+
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.atomic.AtomicInteger;
+
+import junit.framework.Assert;
+
+import org.apache.ambari.server.api.services.AmbariMetaInfo;
+import org.apache.ambari.server.orm.GuiceJpaInitializer;
+import org.apache.ambari.server.orm.InMemoryDefaultTestModule;
+import org.apache.ambari.server.orm.OrmTestHelper;
+import org.apache.ambari.server.state.Cluster;
+import org.apache.ambari.server.state.Clusters;
+import org.apache.ambari.server.state.Host;
+import org.apache.ambari.server.state.RepositoryVersionState;
+import org.apache.ambari.server.state.StackId;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Test;
+
+import com.google.inject.Guice;
+import com.google.inject.Inject;
+import com.google.inject.Injector;
+import com.google.inject.persist.PersistService;
+
+/**
+ * Tests AMBARI-9738 which produced a deadlock during read and writes between
+ * {@link ClustersImpl} and {@link ClusterImpl}.
+ */
+public class ClustersDeadlockTest {
+  private static final String CLUSTER_NAME = "c1";
+  private static final int NUMBER_OF_HOSTS = 100;
+  private static final int NUMBER_OF_THREADS = 3;
+
+  private final AtomicInteger hostNameCounter = new AtomicInteger(0);
+
+  @Inject
+  private Injector injector;
+
+  @Inject
+  private Clusters clusters;
+
+  @Inject
+  private AmbariMetaInfo metaInfo;
+
+  @Inject
+  private OrmTestHelper helper;
+
+  private Cluster cluster;
+
+  @Before
+  public void setup() throws Exception {
+    injector = Guice.createInjector(new InMemoryDefaultTestModule());
+    injector.getInstance(GuiceJpaInitializer.class);
+    injector.injectMembers(this);
+    clusters.addCluster(CLUSTER_NAME);
+
+    StackId stackId = new StackId("HDP-0.1");
+    cluster = clusters.getCluster(CLUSTER_NAME);
+    cluster.setDesiredStackVersion(stackId);
+    helper.getOrCreateRepositoryVersion(stackId.getStackName(), 
stackId.getStackVersion());
+    cluster.createClusterVersion(stackId.getStackName(), 
stackId.getStackVersion(), "admin", RepositoryVersionState.UPGRADING);
+    metaInfo.init();
+  }
+
+  @After
+  public void teardown() {
+    injector.getInstance(PersistService.class).stop();
+  }
+
+  /**
+   * Tests that no deadlock exists when adding hosts from reading from the
+   * cluster.
+   *
+   * @throws Exception
+   */
+  @Test(timeout = 35000)
+  public void testDeadlockWhileMappingHosts() throws Exception {
+    List<Thread> threads = new ArrayList<Thread>();
+    for (int i = 0; i < NUMBER_OF_THREADS; i++) {
+      ClusterReaderThread readerThread = new ClusterReaderThread();
+      ClustersHostMapperThread writerThread = new ClustersHostMapperThread();
+
+      threads.add(readerThread);
+      threads.add(writerThread);
+
+      readerThread.start();
+      writerThread.start();
+    }
+
+    for (Thread thread : threads) {
+      thread.join();
+    }
+
+    Assert.assertEquals(NUMBER_OF_THREADS * NUMBER_OF_HOSTS,
+        clusters.getHostsForCluster(CLUSTER_NAME).size());
+  }
+
+  /**
+   * Tests that no deadlock exists when adding hosts from reading from the
+   * cluster.
+   *
+   * @throws Exception
+   */
+  @Test(timeout = 35000)
+  public void testDeadlockWhileUnmappingHosts() throws Exception {
+    List<Thread> threads = new ArrayList<Thread>();
+    for (int i = 0; i < NUMBER_OF_THREADS; i++) {
+      ClusterReaderThread readerThread = new ClusterReaderThread();
+      ClustersHostUnMapperThread writerThread = new 
ClustersHostUnMapperThread();
+
+      threads.add(readerThread);
+      threads.add(writerThread);
+
+      readerThread.start();
+      writerThread.start();
+    }
+
+    for (Thread thread : threads) {
+      thread.join();
+    }
+
+    Assert.assertEquals(0,
+        clusters.getHostsForCluster(CLUSTER_NAME).size());
+  }
+
+  /**
+   * The {@link ClusterReaderThread} reads from a cluster over and over again
+   * with a slight pause.
+   */
+  private final class ClusterReaderThread extends Thread {
+
+    /**
+     * {@inheritDoc}
+     */
+    @Override
+    public void run() {
+      try {
+        for (int i = 0; i < 1000; i++) {
+          cluster.convertToResponse();
+          Thread.sleep(10);
+        }
+      } catch (Exception exception) {
+        throw new RuntimeException(exception);
+      }
+    }
+  }
+
+  /**
+   * The {@link ClustersHostMapperThread} is used to map hosts to a cluster 
over
+   * and over.
+   */
+  private final class ClustersHostMapperThread extends Thread {
+
+    /**
+     * {@inheritDoc}
+     */
+    @Override
+    public void run() {
+      try {
+        for (int i = 0; i < NUMBER_OF_HOSTS; i++) {
+          String hostName = "c64-" + hostNameCounter.getAndIncrement();
+          clusters.addHost(hostName);
+          setOsFamily(clusters.getHost(hostName), "redhat", "6.4");
+          clusters.getHost(hostName).persist();
+          clusters.mapHostToCluster(hostName, CLUSTER_NAME);
+
+          Thread.sleep(10);
+        }
+      } catch (Exception exception) {
+        throw new RuntimeException(exception);
+      }
+    }
+  }
+
+  /**
+   * The {@link ClustersHostUnMapperThread} is used to unmap hosts to a cluster
+   * over and over.
+   */
+  private final class ClustersHostUnMapperThread extends Thread {
+
+    /**
+     * {@inheritDoc}
+     */
+    @Override
+    public void run() {
+      List<String> hostNames = new ArrayList<String>(100);
+      try {
+        // pre-map the hosts
+        for (int i = 0; i < NUMBER_OF_HOSTS; i++) {
+          String hostName = "c64-" + hostNameCounter.getAndIncrement();
+          hostNames.add(hostName);
+
+          clusters.addHost(hostName);
+          setOsFamily(clusters.getHost(hostName), "redhat", "6.4");
+          clusters.getHost(hostName).persist();
+          clusters.mapHostToCluster(hostName, CLUSTER_NAME);
+        }
+
+        // unmap them all now
+        for (String hostName : hostNames) {
+          clusters.unmapHostFromCluster(hostName, CLUSTER_NAME);
+          Thread.sleep(10);
+        }
+      } catch (Exception exception) {
+        throw new RuntimeException(exception);
+      }
+    }
+  }
+
+
+  private void setOsFamily(Host host, String osFamily, String osVersion) {
+    Map<String, String> hostAttributes = new HashMap<String, String>(2);
+    hostAttributes.put("os_family", osFamily);
+    hostAttributes.put("os_release_version", osVersion);
+    host.setHostAttributes(hostAttributes);
+  }
+}

Reply via email to