Repository: cloudstack Updated Branches: refs/heads/master e427d0004 -> ffaabdc13
CLOUDSTACK-7832: Move some job db update and item purge to completeAsyncJob transaction to avoid MySQL deadlock. Project: http://git-wip-us.apache.org/repos/asf/cloudstack/repo Commit: http://git-wip-us.apache.org/repos/asf/cloudstack/commit/ffaabdc1 Tree: http://git-wip-us.apache.org/repos/asf/cloudstack/tree/ffaabdc1 Diff: http://git-wip-us.apache.org/repos/asf/cloudstack/diff/ffaabdc1 Branch: refs/heads/master Commit: ffaabdc13fde0f0f7b2667a483006e2a4b805f63 Parents: e427d00 Author: Min Chen <min.c...@citrix.com> Authored: Mon Nov 3 10:41:36 2014 -0800 Committer: Min Chen <min.c...@citrix.com> Committed: Mon Nov 3 10:41:36 2014 -0800 ---------------------------------------------------------------------- .../db/src/com/cloud/utils/db/Transaction.java | 8 ++++++++ framework/ipc/pom.xml | 18 ++++++++++++++++++ .../framework/messagebus/MessageBusBase.java | 14 +++++++++++++- .../framework/jobs/impl/AsyncJobManagerImpl.java | 11 ++++++----- 4 files changed, 45 insertions(+), 6 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/cloudstack/blob/ffaabdc1/framework/db/src/com/cloud/utils/db/Transaction.java ---------------------------------------------------------------------- diff --git a/framework/db/src/com/cloud/utils/db/Transaction.java b/framework/db/src/com/cloud/utils/db/Transaction.java index 471e0cf..dd91a96 100755 --- a/framework/db/src/com/cloud/utils/db/Transaction.java +++ b/framework/db/src/com/cloud/utils/db/Transaction.java @@ -18,11 +18,15 @@ package com.cloud.utils.db; import java.util.concurrent.atomic.AtomicLong; +import org.apache.log4j.Logger; + public class Transaction { private final static AtomicLong counter = new AtomicLong(0); private final static TransactionStatus STATUS = new TransactionStatus() { }; + private static final Logger s_logger = Logger.getLogger(Transaction.class); + @SuppressWarnings("deprecation") public static <T, E extends Throwable> T execute(TransactionCallbackWithException<T, E> callback) throws E { String name = "tx-" + counter.incrementAndGet(); @@ -33,6 +37,10 @@ public class Transaction { } TransactionLegacy txn = TransactionLegacy.open(name, databaseId, false); try { +// if (txn.dbTxnStarted()){ +// String warnMsg = "Potential Wrong Usage: TRANSACTION.EXECUTE IS WRAPPED INSIDE ANOTHER DB TRANSACTION!"; +// s_logger.warn(warnMsg, new CloudRuntimeException(warnMsg)); +// } txn.start(); T result = callback.doInTransaction(STATUS); txn.commit(); http://git-wip-us.apache.org/repos/asf/cloudstack/blob/ffaabdc1/framework/ipc/pom.xml ---------------------------------------------------------------------- diff --git a/framework/ipc/pom.xml b/framework/ipc/pom.xml index a5a5379..2a281df 100644 --- a/framework/ipc/pom.xml +++ b/framework/ipc/pom.xml @@ -39,4 +39,22 @@ <version>${project.version}</version> </dependency> </dependencies> + <build> + <plugins> + <plugin> + <artifactId>maven-surefire-plugin</artifactId> + <configuration> + <skipTests>true</skipTests> + </configuration> + <executions> + <execution> + <phase>integration-test</phase> + <goals> + <goal>test</goal> + </goals> + </execution> + </executions> + </plugin> + </plugins> + </build> </project> http://git-wip-us.apache.org/repos/asf/cloudstack/blob/ffaabdc1/framework/ipc/src/org/apache/cloudstack/framework/messagebus/MessageBusBase.java ---------------------------------------------------------------------- diff --git a/framework/ipc/src/org/apache/cloudstack/framework/messagebus/MessageBusBase.java b/framework/ipc/src/org/apache/cloudstack/framework/messagebus/MessageBusBase.java index e8f9bce..e3eeb7b 100644 --- a/framework/ipc/src/org/apache/cloudstack/framework/messagebus/MessageBusBase.java +++ b/framework/ipc/src/org/apache/cloudstack/framework/messagebus/MessageBusBase.java @@ -30,6 +30,9 @@ import org.apache.log4j.Logger; import org.apache.cloudstack.framework.serializer.MessageSerializer; +import com.cloud.utils.db.TransactionLegacy; +import com.cloud.utils.exception.CloudRuntimeException; + public class MessageBusBase implements MessageBus { private final Gate _gate; @@ -158,7 +161,11 @@ public class MessageBusBase implements MessageBus { @Override public void publish(String senderAddress, String subject, PublishScope scope, Object args) { - + // publish cannot be in DB transaction, which may hold DB lock too long, and we are guarding this here + if (!noDbTxn()){ + String errMsg = "NO EVENT PUBLISH CAN BE WRAPPED WITHIN DB TRANSACTION!"; + s_logger.error(errMsg, new CloudRuntimeException(errMsg)); + } if (_gate.enter(true)) { if (s_logger.isTraceEnabled()) { s_logger.trace("Enter gate in message bus publish"); @@ -256,6 +263,11 @@ public class MessageBusBase implements MessageBus { } } + private boolean noDbTxn() { + TransactionLegacy txn = TransactionLegacy.currentTxn(); + return !txn.dbTxnStarted(); + } + // // Support inner classes // http://git-wip-us.apache.org/repos/asf/cloudstack/blob/ffaabdc1/framework/jobs/src/org/apache/cloudstack/framework/jobs/impl/AsyncJobManagerImpl.java ---------------------------------------------------------------------- diff --git a/framework/jobs/src/org/apache/cloudstack/framework/jobs/impl/AsyncJobManagerImpl.java b/framework/jobs/src/org/apache/cloudstack/framework/jobs/impl/AsyncJobManagerImpl.java index 04fab24..aab1683 100644 --- a/framework/jobs/src/org/apache/cloudstack/framework/jobs/impl/AsyncJobManagerImpl.java +++ b/framework/jobs/src/org/apache/cloudstack/framework/jobs/impl/AsyncJobManagerImpl.java @@ -258,6 +258,7 @@ public class AsyncJobManagerImpl extends ManagerBase implements AsyncJobManager, } job.setLastUpdated(DateUtil.currentGMTTime()); + job.setExecutingMsid(null); _jobDao.update(jobId, job); if (s_logger.isDebugEnabled()) { @@ -266,6 +267,11 @@ public class AsyncJobManagerImpl extends ManagerBase implements AsyncJobManager, List<Long> wakeupList = wakeupByJoinedJobCompletion(jobId); _joinMapDao.disjoinAllJobs(jobId); + // purge the job sync item from queue + if (job.getSyncSource() != null) { + _queueMgr.purgeItem(job.getSyncSource().getId()); + } + return wakeupList; } }); @@ -527,12 +533,7 @@ public class AsyncJobManagerImpl extends ManagerBase implements AsyncJobManager, } finally { // guard final clause as well try { - AsyncJobVO jobToUpdate = _jobDao.findById(job.getId()); - jobToUpdate.setExecutingMsid(null); - _jobDao.update(job.getId(), jobToUpdate); - if (job.getSyncSource() != null) { - _queueMgr.purgeItem(job.getSyncSource().getId()); checkQueue(job.getSyncSource().getQueueId()); }