Repository: cloudstack
Updated Branches:
  refs/heads/master e427d0004 -> ffaabdc13


CLOUDSTACK-7832: Move some job db update and item purge to
    completeAsyncJob transaction to avoid MySQL deadlock.


Project: http://git-wip-us.apache.org/repos/asf/cloudstack/repo
Commit: http://git-wip-us.apache.org/repos/asf/cloudstack/commit/ffaabdc1
Tree: http://git-wip-us.apache.org/repos/asf/cloudstack/tree/ffaabdc1
Diff: http://git-wip-us.apache.org/repos/asf/cloudstack/diff/ffaabdc1

Branch: refs/heads/master
Commit: ffaabdc13fde0f0f7b2667a483006e2a4b805f63
Parents: e427d00
Author: Min Chen <min.c...@citrix.com>
Authored: Mon Nov 3 10:41:36 2014 -0800
Committer: Min Chen <min.c...@citrix.com>
Committed: Mon Nov 3 10:41:36 2014 -0800

----------------------------------------------------------------------
 .../db/src/com/cloud/utils/db/Transaction.java    |  8 ++++++++
 framework/ipc/pom.xml                             | 18 ++++++++++++++++++
 .../framework/messagebus/MessageBusBase.java      | 14 +++++++++++++-
 .../framework/jobs/impl/AsyncJobManagerImpl.java  | 11 ++++++-----
 4 files changed, 45 insertions(+), 6 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/cloudstack/blob/ffaabdc1/framework/db/src/com/cloud/utils/db/Transaction.java
----------------------------------------------------------------------
diff --git a/framework/db/src/com/cloud/utils/db/Transaction.java 
b/framework/db/src/com/cloud/utils/db/Transaction.java
index 471e0cf..dd91a96 100755
--- a/framework/db/src/com/cloud/utils/db/Transaction.java
+++ b/framework/db/src/com/cloud/utils/db/Transaction.java
@@ -18,11 +18,15 @@ package com.cloud.utils.db;
 
 import java.util.concurrent.atomic.AtomicLong;
 
+import org.apache.log4j.Logger;
+
 public class Transaction {
     private final static AtomicLong counter = new AtomicLong(0);
     private final static TransactionStatus STATUS = new TransactionStatus() {
     };
 
+    private static final Logger s_logger = Logger.getLogger(Transaction.class);
+
     @SuppressWarnings("deprecation")
     public static <T, E extends Throwable> T 
execute(TransactionCallbackWithException<T, E> callback) throws E {
         String name = "tx-" + counter.incrementAndGet();
@@ -33,6 +37,10 @@ public class Transaction {
         }
         TransactionLegacy txn = TransactionLegacy.open(name, databaseId, 
false);
         try {
+//            if (txn.dbTxnStarted()){
+//                String warnMsg = "Potential Wrong Usage: TRANSACTION.EXECUTE 
IS WRAPPED INSIDE ANOTHER DB TRANSACTION!";
+//                s_logger.warn(warnMsg, new CloudRuntimeException(warnMsg));
+//            }
             txn.start();
             T result = callback.doInTransaction(STATUS);
             txn.commit();

http://git-wip-us.apache.org/repos/asf/cloudstack/blob/ffaabdc1/framework/ipc/pom.xml
----------------------------------------------------------------------
diff --git a/framework/ipc/pom.xml b/framework/ipc/pom.xml
index a5a5379..2a281df 100644
--- a/framework/ipc/pom.xml
+++ b/framework/ipc/pom.xml
@@ -39,4 +39,22 @@
       <version>${project.version}</version>
     </dependency>    
   </dependencies>
+  <build>
+    <plugins>
+      <plugin>
+        <artifactId>maven-surefire-plugin</artifactId>
+        <configuration>
+          <skipTests>true</skipTests>
+        </configuration>
+        <executions>
+          <execution>
+            <phase>integration-test</phase>
+            <goals>
+              <goal>test</goal>
+            </goals>
+          </execution>
+        </executions>
+      </plugin>
+    </plugins>
+  </build>  
 </project>

http://git-wip-us.apache.org/repos/asf/cloudstack/blob/ffaabdc1/framework/ipc/src/org/apache/cloudstack/framework/messagebus/MessageBusBase.java
----------------------------------------------------------------------
diff --git 
a/framework/ipc/src/org/apache/cloudstack/framework/messagebus/MessageBusBase.java
 
b/framework/ipc/src/org/apache/cloudstack/framework/messagebus/MessageBusBase.java
index e8f9bce..e3eeb7b 100644
--- 
a/framework/ipc/src/org/apache/cloudstack/framework/messagebus/MessageBusBase.java
+++ 
b/framework/ipc/src/org/apache/cloudstack/framework/messagebus/MessageBusBase.java
@@ -30,6 +30,9 @@ import org.apache.log4j.Logger;
 
 import org.apache.cloudstack.framework.serializer.MessageSerializer;
 
+import com.cloud.utils.db.TransactionLegacy;
+import com.cloud.utils.exception.CloudRuntimeException;
+
 public class MessageBusBase implements MessageBus {
 
     private final Gate _gate;
@@ -158,7 +161,11 @@ public class MessageBusBase implements MessageBus {
 
     @Override
     public void publish(String senderAddress, String subject, PublishScope 
scope, Object args) {
-
+        // publish cannot be in DB transaction, which may hold DB lock too 
long, and we are guarding this here
+        if (!noDbTxn()){
+            String errMsg = "NO EVENT PUBLISH CAN BE WRAPPED WITHIN DB 
TRANSACTION!";
+            s_logger.error(errMsg, new CloudRuntimeException(errMsg));
+        }
         if (_gate.enter(true)) {
             if (s_logger.isTraceEnabled()) {
                 s_logger.trace("Enter gate in message bus publish");
@@ -256,6 +263,11 @@ public class MessageBusBase implements MessageBus {
         }
     }
 
+    private boolean noDbTxn() {
+        TransactionLegacy txn = TransactionLegacy.currentTxn();
+        return !txn.dbTxnStarted();
+    }
+
     //
     // Support inner classes
     //

http://git-wip-us.apache.org/repos/asf/cloudstack/blob/ffaabdc1/framework/jobs/src/org/apache/cloudstack/framework/jobs/impl/AsyncJobManagerImpl.java
----------------------------------------------------------------------
diff --git 
a/framework/jobs/src/org/apache/cloudstack/framework/jobs/impl/AsyncJobManagerImpl.java
 
b/framework/jobs/src/org/apache/cloudstack/framework/jobs/impl/AsyncJobManagerImpl.java
index 04fab24..aab1683 100644
--- 
a/framework/jobs/src/org/apache/cloudstack/framework/jobs/impl/AsyncJobManagerImpl.java
+++ 
b/framework/jobs/src/org/apache/cloudstack/framework/jobs/impl/AsyncJobManagerImpl.java
@@ -258,6 +258,7 @@ public class AsyncJobManagerImpl extends ManagerBase 
implements AsyncJobManager,
                 }
 
                 job.setLastUpdated(DateUtil.currentGMTTime());
+                job.setExecutingMsid(null);
                 _jobDao.update(jobId, job);
 
                 if (s_logger.isDebugEnabled()) {
@@ -266,6 +267,11 @@ public class AsyncJobManagerImpl extends ManagerBase 
implements AsyncJobManager,
                 List<Long> wakeupList = wakeupByJoinedJobCompletion(jobId);
                 _joinMapDao.disjoinAllJobs(jobId);
 
+                // purge the job sync item from queue
+                if (job.getSyncSource() != null) {
+                    _queueMgr.purgeItem(job.getSyncSource().getId());
+                }
+
                 return wakeupList;
             }
         });
@@ -527,12 +533,7 @@ public class AsyncJobManagerImpl extends ManagerBase 
implements AsyncJobManager,
                 } finally {
                     // guard final clause as well
                     try {
-                        AsyncJobVO jobToUpdate = _jobDao.findById(job.getId());
-                        jobToUpdate.setExecutingMsid(null);
-                        _jobDao.update(job.getId(), jobToUpdate);
-
                         if (job.getSyncSource() != null) {
-                            _queueMgr.purgeItem(job.getSyncSource().getId());
                             checkQueue(job.getSyncSource().getQueueId());
                         }
 

Reply via email to