https://issues.apache.org/jira/browse/AMQ-5125
Fix for potential deadlock when external classes synchronize on the LevelDBStore instance which can deadlock the hawtDispatch runner thread if a task also attempts to take the lock to protect some mutable state values. Project: http://git-wip-us.apache.org/repos/asf/activemq/repo Commit: http://git-wip-us.apache.org/repos/asf/activemq/commit/8f438106 Tree: http://git-wip-us.apache.org/repos/asf/activemq/tree/8f438106 Diff: http://git-wip-us.apache.org/repos/asf/activemq/diff/8f438106 Branch: refs/heads/activemq-5.10.x Commit: 8f43810679ad9a1d3ab4a28397ad0229fc8ffe31 Parents: 8ae10b9 Author: Timothy Bish <[email protected]> Authored: Mon Jul 7 17:53:46 2014 -0400 Committer: Hadrian Zbarcea <[email protected]> Committed: Mon Dec 15 19:06:48 2014 -0500 ---------------------------------------------------------------------- .../apache/activemq/leveldb/LevelDBStore.scala | 20 +++++++++++--------- 1 file changed, 11 insertions(+), 9 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/activemq/blob/8f438106/activemq-leveldb-store/src/main/scala/org/apache/activemq/leveldb/LevelDBStore.scala ---------------------------------------------------------------------- diff --git a/activemq-leveldb-store/src/main/scala/org/apache/activemq/leveldb/LevelDBStore.scala b/activemq-leveldb-store/src/main/scala/org/apache/activemq/leveldb/LevelDBStore.scala index 146269d..42b25c6 100644 --- a/activemq-leveldb-store/src/main/scala/org/apache/activemq/leveldb/LevelDBStore.scala +++ b/activemq-leveldb-store/src/main/scala/org/apache/activemq/leveldb/LevelDBStore.scala @@ -183,6 +183,8 @@ class LevelDBStore extends LockableServiceSupport with BrokerServiceAware with P val topicsById = collection.mutable.HashMap[Long, LevelDBStore#LevelDBTopicMessageStore]() val plists = collection.mutable.HashMap[String, LevelDBStore#LevelDBPList]() + private val lock = new Object(); + def check_running = { if( this.isStopped ) { throw new SuppressReplyException("Store has been stopped") @@ -539,12 +541,12 @@ class LevelDBStore extends LockableServiceSupport with BrokerServiceAware with P def getPList(name: String): PList = { - this.synchronized(plists.get(name)).getOrElse(db.createPList(name)) + lock.synchronized(plists.get(name)).getOrElse(db.createPList(name)) } def createPList(name: String, key: Long):LevelDBStore#LevelDBPList = { var rc = new LevelDBPList(name, key) - this.synchronized { + lock.synchronized { plists.put(name, rc) } rc @@ -572,30 +574,30 @@ class LevelDBStore extends LockableServiceSupport with BrokerServiceAware with P } def createQueueMessageStore(destination: ActiveMQQueue):LevelDBStore#LevelDBMessageStore = { - this.synchronized(queues.get(destination)).getOrElse(db.createQueueStore(destination)) + lock.synchronized(queues.get(destination)).getOrElse(db.createQueueStore(destination)) } def createQueueMessageStore(destination: ActiveMQQueue, key: Long):LevelDBStore#LevelDBMessageStore = { var rc = new LevelDBMessageStore(destination, key) - this.synchronized { + lock.synchronized { queues.put(destination, rc) } rc } - def removeQueueMessageStore(destination: ActiveMQQueue): Unit = this synchronized { + def removeQueueMessageStore(destination: ActiveMQQueue): Unit = lock synchronized { queues.remove(destination).foreach { store=> db.destroyQueueStore(store.key) } } def createTopicMessageStore(destination: ActiveMQTopic):LevelDBStore#LevelDBTopicMessageStore = { - this.synchronized(topics.get(destination)).getOrElse(db.createTopicStore(destination)) + lock.synchronized(topics.get(destination)).getOrElse(db.createTopicStore(destination)) } def createTopicMessageStore(destination: ActiveMQTopic, key: Long):LevelDBStore#LevelDBTopicMessageStore = { var rc = new LevelDBTopicMessageStore(destination, key) - this synchronized { + lock synchronized { topics.put(destination, rc) topicsById.put(key, rc) } @@ -772,7 +774,7 @@ class LevelDBStore extends LockableServiceSupport with BrokerServiceAware with P // This gts called when the store is first loading up, it restores // the existing durable subs.. def createSubscription(sub:DurableSubscription) = { - this.synchronized(topicsById.get(sub.topicKey)) match { + lock.synchronized(topicsById.get(sub.topicKey)) match { case Some(topic) => topic.synchronized { topic.subscriptions.put((sub.info.getClientId, sub.info.getSubcriptionName), sub) @@ -785,7 +787,7 @@ class LevelDBStore extends LockableServiceSupport with BrokerServiceAware with P def getTopicGCPositions = { import collection.JavaConversions._ - val topics = this.synchronized { + val topics = lock.synchronized { new ArrayList(topicsById.values()) } topics.flatMap(_.gcPosition).toSeq
