AMBARI-12026. During rolling upgrade proces, Ambari becomes unresponsive. (mpapirkovskyy)
Project: http://git-wip-us.apache.org/repos/asf/ambari/repo Commit: http://git-wip-us.apache.org/repos/asf/ambari/commit/482de8df Tree: http://git-wip-us.apache.org/repos/asf/ambari/tree/482de8df Diff: http://git-wip-us.apache.org/repos/asf/ambari/diff/482de8df Branch: refs/heads/trunk Commit: 482de8df4e85fb7d8fcc93026d5e0631d5484d88 Parents: 4ce1e83 Author: Myroslav Papirkovskyy <mpapyrkovs...@hortonworks.com> Authored: Sun Jun 21 04:28:11 2015 +0300 Committer: Myroslav Papirkovskyy <mpapyrkovs...@hortonworks.com> Committed: Sun Jun 21 05:04:01 2015 +0300 ---------------------------------------------------------------------- .../apache/ambari/server/state/ConfigImpl.java | 309 +++++++++++++------ .../state/cluster/ClusterDeadlockTest.java | 105 ++++++- .../server/testing/DeadlockWarningThread.java | 61 ++-- 3 files changed, 336 insertions(+), 139 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/ambari/blob/482de8df/ambari-server/src/main/java/org/apache/ambari/server/state/ConfigImpl.java ---------------------------------------------------------------------- diff --git a/ambari-server/src/main/java/org/apache/ambari/server/state/ConfigImpl.java b/ambari-server/src/main/java/org/apache/ambari/server/state/ConfigImpl.java index 71604c1..ea6aecd 100644 --- a/ambari-server/src/main/java/org/apache/ambari/server/state/ConfigImpl.java +++ b/ambari-server/src/main/java/org/apache/ambari/server/state/ConfigImpl.java @@ -23,6 +23,8 @@ import java.util.Date; import java.util.HashMap; import java.util.List; import java.util.Map; +import java.util.concurrent.locks.ReadWriteLock; +import java.util.concurrent.locks.ReentrantReadWriteLock; import org.apache.ambari.server.orm.dao.ClusterDAO; import org.apache.ambari.server.orm.dao.ServiceConfigDAO; @@ -46,13 +48,15 @@ public class ConfigImpl implements Config { public static final String GENERATED_TAG_PREFIX = "generatedTag_"; + private final ReadWriteLock readWriteLock = new ReentrantReadWriteLock(); + private Cluster cluster; private StackId stackId; private String type; - private String tag; - private Long version; - private Map<String, String> properties; - private Map<String, Map<String, String>> propertiesAttributes; + private volatile String tag; + private volatile Long version; + private volatile Map<String, String> properties; + private volatile Map<String, Map<String, String>> propertiesAttributes; private ClusterConfigEntity entity; @Inject @@ -105,93 +109,208 @@ public class ConfigImpl implements Config { * {@inheritDoc} */ @Override - public synchronized StackId getStackId() { - return stackId; + public StackId getStackId() { + readWriteLock.readLock().lock(); + try { + return stackId; + } finally { + readWriteLock.readLock().unlock(); + } + } @Override - public synchronized void setStackId(StackId stackId) { - this.stackId = stackId; + public void setStackId(StackId stackId) { + readWriteLock.writeLock().lock(); + try { + this.stackId = stackId; + } finally { + readWriteLock.writeLock().unlock(); + } + } @Override public String getType() { - return type; + readWriteLock.readLock().lock(); + try { + return type; + } finally { + readWriteLock.readLock().unlock(); + } + } @Override - public synchronized String getTag() { + public String getTag() { if (tag == null) { - tag = GENERATED_TAG_PREFIX + getVersion(); + readWriteLock.writeLock().lock(); + try { + if (tag == null) { + tag = GENERATED_TAG_PREFIX + getVersion(); + } + } finally { + readWriteLock.writeLock().unlock(); + } + } + + readWriteLock.readLock().lock(); + try { + + return tag; + } finally { + readWriteLock.readLock().unlock(); } - return tag; + } @Override - public synchronized Long getVersion() { + public Long getVersion() { if (version == null && cluster != null) { - version = cluster.getNextConfigVersion(type); + readWriteLock.writeLock().lock(); + try { + if (version == null) { + version = cluster.getNextConfigVersion(type); //pure DB calculation call, no cluster locking required + } + } finally { + readWriteLock.writeLock().unlock(); + } + } + + readWriteLock.readLock().lock(); + try { + return version; + } finally { + readWriteLock.readLock().unlock(); } - return version; + } @Override - public synchronized Map<String, String> getProperties() { + public Map<String, String> getProperties() { if (null != entity && null == properties) { + readWriteLock.writeLock().lock(); + try { + if (properties == null) { + properties = gson.<Map<String, String>>fromJson(entity.getData(), Map.class); + } + } finally { + readWriteLock.writeLock().unlock(); + } + } - properties = gson.<Map<String, String>>fromJson(entity.getData(), Map.class); - + readWriteLock.readLock().lock(); + try { + return null == properties ? new HashMap<String, String>() + : new HashMap<String, String>(properties); + } finally { + readWriteLock.readLock().unlock(); } - return null == properties ? new HashMap<String, String>() - : new HashMap<String, String>(properties); + } @Override - public synchronized Map<String, Map<String, String>> getPropertiesAttributes() { + public Map<String, Map<String, String>> getPropertiesAttributes() { if (null != entity && null == propertiesAttributes) { - propertiesAttributes = gson.<Map<String, Map<String, String>>>fromJson(entity.getAttributes(), Map.class); + readWriteLock.writeLock().lock(); + try { + if (propertiesAttributes == null) { + propertiesAttributes = gson.<Map<String, Map<String, String>>>fromJson(entity.getAttributes(), Map.class); + } + } finally { + readWriteLock.writeLock().unlock(); + } + } + + readWriteLock.readLock().lock(); + try { + return null == propertiesAttributes ? null : new HashMap<String, Map<String, String>>(propertiesAttributes); + } finally { + readWriteLock.readLock().unlock(); } - return null == propertiesAttributes ? null : new HashMap<String, Map<String, String>>(propertiesAttributes); + } @Override - public synchronized void setTag(String tag) { - this.tag = tag; + public void setTag(String tag) { + readWriteLock.writeLock().lock(); + try { + this.tag = tag; + } finally { + readWriteLock.writeLock().unlock(); + } + } @Override - public synchronized void setVersion(Long version) { - this.version = version; + public void setVersion(Long version) { + readWriteLock.writeLock().lock(); + try { + this.version = version; + } finally { + readWriteLock.writeLock().unlock(); + } + } @Override - public synchronized void setProperties(Map<String, String> properties) { - this.properties = properties; + public void setProperties(Map<String, String> properties) { + readWriteLock.writeLock().lock(); + try { + this.properties = properties; + } finally { + readWriteLock.writeLock().unlock(); + } + } @Override public void setPropertiesAttributes(Map<String, Map<String, String>> propertiesAttributes) { - this.propertiesAttributes = propertiesAttributes; + readWriteLock.writeLock().lock(); + try { + this.propertiesAttributes = propertiesAttributes; + } finally { + readWriteLock.writeLock().unlock(); + } + } @Override - public synchronized void updateProperties(Map<String, String> properties) { - this.properties.putAll(properties); + public void updateProperties(Map<String, String> properties) { + readWriteLock.writeLock().lock(); + try { + this.properties.putAll(properties); + } finally { + readWriteLock.writeLock().unlock(); + } + } @Override - public synchronized List<Long> getServiceConfigVersions() { - if (cluster == null || type == null || version == null) { - return Collections.emptyList(); + public List<Long> getServiceConfigVersions() { + readWriteLock.readLock().lock(); + try { + if (cluster == null || type == null || version == null) { + return Collections.emptyList(); + } + return serviceConfigDAO.getServiceConfigVersionsByConfig(cluster.getClusterId(), type, version); + } finally { + readWriteLock.readLock().unlock(); } - return serviceConfigDAO.getServiceConfigVersionsByConfig(cluster.getClusterId(), type, version); + } @Override - public synchronized void deleteProperties(List<String> properties) { - for (String key : properties) { - this.properties.remove(key); + public void deleteProperties(List<String> properties) { + readWriteLock.writeLock().lock(); + try { + for (String key : properties) { + this.properties.remove(key); + } + } finally { + readWriteLock.writeLock().unlock(); } + } @Override @@ -201,58 +320,70 @@ public class ConfigImpl implements Config { @Override @Transactional - public synchronized void persist(boolean newConfig) { - ClusterEntity clusterEntity = clusterDAO.findById(cluster.getClusterId()); - - if (newConfig) { - ClusterConfigEntity entity = new ClusterConfigEntity(); - entity.setClusterEntity(clusterEntity); - entity.setClusterId(cluster.getClusterId()); - entity.setType(getType()); - entity.setVersion(getVersion()); - entity.setTag(getTag()); - entity.setTimestamp(new Date().getTime()); - entity.setStack(clusterEntity.getDesiredStack()); - entity.setData(gson.toJson(getProperties())); - - if (null != getPropertiesAttributes()) { - entity.setAttributes(gson.toJson(getPropertiesAttributes())); - } - - clusterDAO.createConfig(entity); - clusterEntity.getClusterConfigEntities().add(entity); - - // save the entity, forcing a flush to ensure the refresh picks up the - // newest data - clusterDAO.merge(clusterEntity, true); - cluster.refresh(); - } else { - // only supporting changes to the properties - ClusterConfigEntity entity = null; - - // find the existing configuration to update - for (ClusterConfigEntity cfe : clusterEntity.getClusterConfigEntities()) { - if (getTag().equals(cfe.getTag()) && - getType().equals(cfe.getType()) && - getVersion().equals(cfe.getVersion())) { - entity = cfe; - break; + public void persist(boolean newConfig) { + cluster.getClusterGlobalLock().writeLock().lock(); //null cluster is not expected, NPE anyway later in code + try { + readWriteLock.writeLock().lock(); + try { + ClusterEntity clusterEntity = clusterDAO.findById(cluster.getClusterId()); + + if (newConfig) { + ClusterConfigEntity entity = new ClusterConfigEntity(); + entity.setClusterEntity(clusterEntity); + entity.setClusterId(cluster.getClusterId()); + entity.setType(getType()); + entity.setVersion(getVersion()); + entity.setTag(getTag()); + entity.setTimestamp(new Date().getTime()); + entity.setStack(clusterEntity.getDesiredStack()); + entity.setData(gson.toJson(getProperties())); + + if (null != getPropertiesAttributes()) { + entity.setAttributes(gson.toJson(getPropertiesAttributes())); + } + + clusterDAO.createConfig(entity); + clusterEntity.getClusterConfigEntities().add(entity); + + // save the entity, forcing a flush to ensure the refresh picks up the + // newest data + clusterDAO.merge(clusterEntity, true); + cluster.refresh(); + } else { + // only supporting changes to the properties + ClusterConfigEntity entity = null; + + // find the existing configuration to update + for (ClusterConfigEntity cfe : clusterEntity.getClusterConfigEntities()) { + if (getTag().equals(cfe.getTag()) && + getType().equals(cfe.getType()) && + getVersion().equals(cfe.getVersion())) { + entity = cfe; + break; + } + } + + // if the configuration was found, then update it + if (null != entity) { + LOG.debug( + "Updating {} version {} with new configurations; a new version will not be created", + getType(), getVersion()); + + entity.setData(gson.toJson(getProperties())); + + // save the entity, forcing a flush to ensure the refresh picks up the + // newest data + clusterDAO.merge(clusterEntity, true); + cluster.refresh(); + } } + } finally { + readWriteLock.writeLock().unlock(); } - - // if the configuration was found, then update it - if (null != entity) { - LOG.debug( - "Updating {} version {} with new configurations; a new version will not be created", - getType(), getVersion()); - - entity.setData(gson.toJson(getProperties())); - - // save the entity, forcing a flush to ensure the refresh picks up the - // newest data - clusterDAO.merge(clusterEntity, true); - cluster.refresh(); - } + } finally { + cluster.getClusterGlobalLock().writeLock().unlock(); } + } + } http://git-wip-us.apache.org/repos/asf/ambari/blob/482de8df/ambari-server/src/test/java/org/apache/ambari/server/state/cluster/ClusterDeadlockTest.java ---------------------------------------------------------------------- diff --git a/ambari-server/src/test/java/org/apache/ambari/server/state/cluster/ClusterDeadlockTest.java b/ambari-server/src/test/java/org/apache/ambari/server/state/cluster/ClusterDeadlockTest.java index 08f9743..847de7d 100644 --- a/ambari-server/src/test/java/org/apache/ambari/server/state/cluster/ClusterDeadlockTest.java +++ b/ambari-server/src/test/java/org/apache/ambari/server/state/cluster/ClusterDeadlockTest.java @@ -18,9 +18,27 @@ package org.apache.ambari.server.state.cluster; +import org.apache.ambari.server.state.Cluster; +import org.apache.ambari.server.state.Clusters; +import org.apache.ambari.server.state.Config; +import org.apache.ambari.server.state.ConfigFactory; +import org.apache.ambari.server.state.Host; +import org.apache.ambari.server.state.MaintenanceState; +import org.apache.ambari.server.state.RepositoryVersionState; +import org.apache.ambari.server.state.Service; +import org.apache.ambari.server.state.ServiceComponent; +import org.apache.ambari.server.state.ServiceComponentFactory; +import org.apache.ambari.server.state.ServiceComponentHost; +import org.apache.ambari.server.state.ServiceComponentHostFactory; +import org.apache.ambari.server.state.ServiceFactory; +import org.apache.ambari.server.state.StackId; +import org.apache.ambari.server.state.State; import org.apache.ambari.server.testing.DeadlockWarningThread; + import java.util.ArrayList; +import java.util.Arrays; import java.util.HashMap; +import java.util.HashSet; import java.util.List; import java.util.Map; import java.util.concurrent.atomic.AtomicInteger; @@ -32,19 +50,6 @@ import org.apache.ambari.server.events.listeners.upgrade.HostVersionOutOfSyncLis import org.apache.ambari.server.orm.GuiceJpaInitializer; import org.apache.ambari.server.orm.InMemoryDefaultTestModule; import org.apache.ambari.server.orm.OrmTestHelper; -import org.apache.ambari.server.state.Cluster; -import org.apache.ambari.server.state.Clusters; -import org.apache.ambari.server.state.Host; -import org.apache.ambari.server.state.MaintenanceState; -import org.apache.ambari.server.state.RepositoryVersionState; -import org.apache.ambari.server.state.Service; -import org.apache.ambari.server.state.ServiceComponent; -import org.apache.ambari.server.state.ServiceComponentFactory; -import org.apache.ambari.server.state.ServiceComponentHost; -import org.apache.ambari.server.state.ServiceComponentHostFactory; -import org.apache.ambari.server.state.ServiceFactory; -import org.apache.ambari.server.state.StackId; -import org.apache.ambari.server.state.State; import org.easymock.EasyMock; import org.junit.After; import org.junit.Assert; @@ -85,6 +90,9 @@ public class ClusterDeadlockTest { private ServiceComponentHostFactory serviceComponentHostFactory; @Inject + private ConfigFactory configFactory; + + @Inject private OrmTestHelper helper; private StackId stackId = new StackId("HDP-0.1"); @@ -117,6 +125,16 @@ public class ClusterDeadlockTest { cluster.createClusterVersion(stackId, stackId.getStackVersion(), "admin", RepositoryVersionState.UPGRADING); + Config config1 = configFactory.createNew(cluster, "test-type1", new HashMap<String, String>(), new HashMap<String, + Map<String, String>>()); + Config config2 = configFactory.createNew(cluster, "test-type2", new HashMap<String, String>(), new HashMap<String, + Map<String, String>>()); + config1.persist(); + config2.persist(); + cluster.addConfig(config1); + cluster.addConfig(config2); + cluster.addDesiredConfig("test user", new HashSet<Config>(Arrays.asList(config1, config2))); + // 100 hosts for (int i = 0; i < NUMBER_OF_HOSTS; i++) { String hostName = "c64-" + i; @@ -270,6 +288,67 @@ public class ClusterDeadlockTest { } } + @Test + public void testDeadlockWithConfigsUpdate() throws Exception { + List<Thread> threads = new ArrayList<Thread>(); + for (int i = 0; i < NUMBER_OF_THREADS; i++) { + ClusterDesiredConfigsReaderThread readerThread = null; + for (int j = 0; j < NUMBER_OF_THREADS; j++) { + readerThread = new ClusterDesiredConfigsReaderThread(); + threads.add(readerThread); + } + for (Config config : cluster.getAllConfigs()) { + ConfigUpdaterThread configUpdaterThread = new ConfigUpdaterThread(config); + threads.add(configUpdaterThread); + } + + } + + for (Thread thread : threads) { + thread.start(); + } + + DeadlockWarningThread wt = new DeadlockWarningThread(threads); + + while (true) { + if(!wt.isAlive()) { + break; + } + } + if (wt.isDeadlocked()){ + Assert.assertFalse(wt.getErrorMessages().toString(), wt.isDeadlocked()); + } else { + Assert.assertFalse(wt.isDeadlocked()); + } + + + } + + + private final class ClusterDesiredConfigsReaderThread extends Thread { + @Override + public void run() { + for (int i =0; i<1500; i++) { + cluster.getDesiredConfigs(); + } + } + } + + private final class ConfigUpdaterThread extends Thread { + private Config config; + + public ConfigUpdaterThread(Config config) { + this.config = config; + } + + @Override + public void run() { + for (int i =0; i<500; i++) { + config.persist(false); + } + } + } + /** * The {@link ClusterReaderThread} reads from a cluster over and over again * with a slight pause. http://git-wip-us.apache.org/repos/asf/ambari/blob/482de8df/ambari-server/src/test/java/org/apache/ambari/server/testing/DeadlockWarningThread.java ---------------------------------------------------------------------- diff --git a/ambari-server/src/test/java/org/apache/ambari/server/testing/DeadlockWarningThread.java b/ambari-server/src/test/java/org/apache/ambari/server/testing/DeadlockWarningThread.java index b1237df..589aee1 100644 --- a/ambari-server/src/test/java/org/apache/ambari/server/testing/DeadlockWarningThread.java +++ b/ambari-server/src/test/java/org/apache/ambari/server/testing/DeadlockWarningThread.java @@ -16,16 +16,15 @@ */ package org.apache.ambari.server.testing; +import org.apache.commons.lang.ArrayUtils; + import java.lang.management.ManagementFactory; import java.lang.management.ThreadInfo; import java.lang.management.ThreadMXBean; import java.util.ArrayList; import java.util.Arrays; import java.util.Collection; -import java.util.HashSet; import java.util.List; -import java.util.Set; -import java.util.TreeSet; /** * @@ -34,13 +33,11 @@ import java.util.TreeSet; */ public class DeadlockWarningThread extends Thread { - private Thread parentThread; private final List<String> errorMessages; private static final int MAX_STACK_DEPTH = 30; private Collection<Thread> monitoredThreads = null; private boolean deadlocked = false; private static final ThreadMXBean mbean = ManagementFactory.getThreadMXBean(); - private String stacktrace = ""; public List<String> getErrorMessages() { return errorMessages; @@ -53,11 +50,10 @@ public class DeadlockWarningThread extends Thread { public DeadlockWarningThread(Collection<Thread> monitoredThreads) { this.errorMessages = new ArrayList<String>(); this.monitoredThreads = monitoredThreads; - parentThread = Thread.currentThread(); start(); } - public String getThreadsStacktraces(long[] ids) { + public String getThreadsStacktraces(Collection<Long> ids) { StringBuilder errBuilder = new StringBuilder(); for (long id : ids) { ThreadInfo ti = mbean.getThreadInfo(id, MAX_STACK_DEPTH); @@ -83,7 +79,7 @@ public class DeadlockWarningThread extends Thread { long[] ids = mbean.findMonitorDeadlockedThreads(); StringBuilder errBuilder = new StringBuilder(); if (ids != null && ids.length > 0) { - errBuilder.append(getThreadsStacktraces(ids)); + errBuilder.append(getThreadsStacktraces(Arrays.asList(ArrayUtils.toObject(ids)))); errorMessages.add(errBuilder.toString()); System.out.append(errBuilder.toString()); //Exit if deadlocks have been found @@ -92,42 +88,33 @@ public class DeadlockWarningThread extends Thread { } else { //Exit if all monitored threads were finished boolean hasLive = false; - Set<Thread> activeThreads = new HashSet<Thread>(); + boolean hasRunning = false; for (Thread monTh : monitoredThreads) { - ThreadGroup group = monTh.getThreadGroup(); - Thread[] groupThreads = new Thread[group.activeCount()]; - group.enumerate(groupThreads, true); - activeThreads.addAll(Arrays.asList(groupThreads)); - } - activeThreads.remove(Thread.currentThread()); - activeThreads.remove(parentThread); - Set<Long> idSet = new TreeSet<Long>(); - for (Thread activeThread : activeThreads) { - if (activeThread.isAlive()) { + State state = monTh.getState(); + if (state != State.TERMINATED && state != State.NEW) { hasLive = true; - idSet.add(activeThread.getId()); - } + } + if (state == State.RUNNABLE || state == State.TIMED_WAITING) { + hasRunning = true; + break; + } } - long[] tid = new long[idSet.size()]; + if (!hasLive) { deadlocked = false; break; - } else { - int cnt = 0; - for (Long id : idSet) { - tid[cnt] = id; - cnt++; - } - String currentStackTrace = getThreadsStacktraces(tid); - if (stacktrace.equals(currentStackTrace)) { - errBuilder.append(currentStackTrace); - errorMessages.add(currentStackTrace); - System.out.append(currentStackTrace); - deadlocked = true; - break; - } else { - stacktrace = currentStackTrace; + } else if (!hasRunning) { + List<Long> tIds = new ArrayList<Long>(); + for (Thread monitoredThread : monitoredThreads) { + State state = monitoredThread.getState(); + if (state == State.WAITING || state == State.BLOCKED) { + tIds.add(monitoredThread.getId()); + } } + errBuilder.append(getThreadsStacktraces(tIds)); + errorMessages.add(errBuilder.toString()); + deadlocked = true; + break; } } }