Deadlock fix

Project: http://git-wip-us.apache.org/repos/asf/cloudstack/repo
Commit: http://git-wip-us.apache.org/repos/asf/cloudstack/commit/0233044b
Tree: http://git-wip-us.apache.org/repos/asf/cloudstack/tree/0233044b
Diff: http://git-wip-us.apache.org/repos/asf/cloudstack/diff/0233044b

Branch: refs/heads/vmsync
Commit: 0233044b20bab0e0f7822660aa9e08bf530409ee
Parents: c6ba4a2
Author: Alex Huang <[email protected]>
Authored: Thu Jun 27 14:37:59 2013 -0700
Committer: Alex Huang <[email protected]>
Committed: Thu Jun 27 14:37:59 2013 -0700

----------------------------------------------------------------------
 .../cloudstack/framework/jobs/AsyncJob.java     |   2 +-
 .../framework/jobs/AsyncJobManager.java         |   2 -
 .../jobs/dao/AsyncJobJoinMapDaoImpl.java        |   2 +-
 .../jobs/impl/AsyncJobManagerImpl.java          | 157 ++++++++++++-------
 4 files changed, 103 insertions(+), 60 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/cloudstack/blob/0233044b/framework/jobs/src/org/apache/cloudstack/framework/jobs/AsyncJob.java
----------------------------------------------------------------------
diff --git 
a/framework/jobs/src/org/apache/cloudstack/framework/jobs/AsyncJob.java 
b/framework/jobs/src/org/apache/cloudstack/framework/jobs/AsyncJob.java
index 2ed75a9..61fb396 100644
--- a/framework/jobs/src/org/apache/cloudstack/framework/jobs/AsyncJob.java
+++ b/framework/jobs/src/org/apache/cloudstack/framework/jobs/AsyncJob.java
@@ -32,7 +32,7 @@ public interface AsyncJob extends JobInfo {
         public static final String JOB_STATE = "job.state";
     }
     
-    public static interface Contants {
+    public static interface Constants {
 
        // Although we may have detailed masks for each individual wakeup 
event, i.e.
         // periodical timer, matched topic from message bus, it seems that we 
don't

http://git-wip-us.apache.org/repos/asf/cloudstack/blob/0233044b/framework/jobs/src/org/apache/cloudstack/framework/jobs/AsyncJobManager.java
----------------------------------------------------------------------
diff --git 
a/framework/jobs/src/org/apache/cloudstack/framework/jobs/AsyncJobManager.java 
b/framework/jobs/src/org/apache/cloudstack/framework/jobs/AsyncJobManager.java
index 440188a..bc06101 100644
--- 
a/framework/jobs/src/org/apache/cloudstack/framework/jobs/AsyncJobManager.java
+++ 
b/framework/jobs/src/org/apache/cloudstack/framework/jobs/AsyncJobManager.java
@@ -37,8 +37,6 @@ public interface AsyncJobManager extends Manager {
 
     void completeAsyncJob(long jobId, JobInfo.Status jobStatus, int 
resultCode, String result);
 
-    List<Long> wakeupByJoinedJobCompletion(long joinedJobId);
-
     void updateAsyncJobStatus(long jobId, int processStatus, String 
resultObject);
     void updateAsyncJobAttachment(long jobId, String instanceType, Long 
instanceId);
     void logJobJournal(long jobId, AsyncJob.JournalType journalType, String

http://git-wip-us.apache.org/repos/asf/cloudstack/blob/0233044b/framework/jobs/src/org/apache/cloudstack/framework/jobs/dao/AsyncJobJoinMapDaoImpl.java
----------------------------------------------------------------------
diff --git 
a/framework/jobs/src/org/apache/cloudstack/framework/jobs/dao/AsyncJobJoinMapDaoImpl.java
 
b/framework/jobs/src/org/apache/cloudstack/framework/jobs/dao/AsyncJobJoinMapDaoImpl.java
index ba6cfbc..fa3b14b 100644
--- 
a/framework/jobs/src/org/apache/cloudstack/framework/jobs/dao/AsyncJobJoinMapDaoImpl.java
+++ 
b/framework/jobs/src/org/apache/cloudstack/framework/jobs/dao/AsyncJobJoinMapDaoImpl.java
@@ -169,7 +169,7 @@ public class AsyncJobJoinMapDaoImpl extends 
GenericDaoBase<AsyncJobJoinMapVO, Lo
                        String sql = "UPDATE async_job SET 
job_pending_signals=? WHERE id IN " +
                                        "(SELECT job_id FROM async_job_join_map 
WHERE next_wakeup < ? AND expiration > ?)";
                        pstmt = txn.prepareStatement(sql);
-                       pstmt.setInt(1, AsyncJob.Contants.SIGNAL_MASK_WAKEUP);
+                       pstmt.setInt(1, AsyncJob.Constants.SIGNAL_MASK_WAKEUP);
                pstmt.setString(2, 
DateUtil.getDateDisplayString(TimeZone.getTimeZone("GMT"), cutDate));
                pstmt.setString(3, 
DateUtil.getDateDisplayString(TimeZone.getTimeZone("GMT"), cutDate));
                pstmt.executeUpdate();

http://git-wip-us.apache.org/repos/asf/cloudstack/blob/0233044b/framework/jobs/src/org/apache/cloudstack/framework/jobs/impl/AsyncJobManagerImpl.java
----------------------------------------------------------------------
diff --git 
a/framework/jobs/src/org/apache/cloudstack/framework/jobs/impl/AsyncJobManagerImpl.java
 
b/framework/jobs/src/org/apache/cloudstack/framework/jobs/impl/AsyncJobManagerImpl.java
index 2351a10..9b6aa97 100644
--- 
a/framework/jobs/src/org/apache/cloudstack/framework/jobs/impl/AsyncJobManagerImpl.java
+++ 
b/framework/jobs/src/org/apache/cloudstack/framework/jobs/impl/AsyncJobManagerImpl.java
@@ -19,12 +19,17 @@ package org.apache.cloudstack.framework.jobs.impl;
 
 import java.io.File;
 import java.io.FileInputStream;
+import java.sql.PreparedStatement;
+import java.sql.ResultSet;
+import java.sql.SQLException;
+import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.Date;
 import java.util.List;
 import java.util.Map;
 import java.util.Properties;
 import java.util.Random;
+import java.util.TimeZone;
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.Executors;
 import java.util.concurrent.RejectedExecutionException;
@@ -241,7 +246,7 @@ public class AsyncJobManagerImpl extends ManagerBase 
implements AsyncJobManager,
             for(Long id : wakeupList) {
                // TODO, we assume that all jobs in this category is API job 
only
                AsyncJobVO jobToWakeup = _jobDao.findById(id);
-                if (jobToWakeup != null && (jobToWakeup.getPendingSignals() & 
AsyncJob.Contants.SIGNAL_MASK_WAKEUP) != 0)
+                if (jobToWakeup != null && (jobToWakeup.getPendingSignals() & 
AsyncJob.Constants.SIGNAL_MASK_WAKEUP) != 0)
                    scheduleExecution(jobToWakeup, false);
             }
              
@@ -353,8 +358,9 @@ public class AsyncJobManagerImpl extends ManagerBase 
implements AsyncJobManager,
        // TODO
        // this is a temporary solution to solve strange MySQL deadlock issue,
        // completeJoin() causes deadlock happens at async_job table
+        // I removed the temporary solution already.  I think my changes 
should fix the deadlock.
 
-/*     
+/*
        ------------------------
        LATEST DETECTED DEADLOCK
        ------------------------
@@ -387,25 +393,9 @@ public class AsyncJobManagerImpl extends ManagerBase 
implements AsyncJobManager,
        0: len 8; hex 0000000000000009; asc         ;; 1: len 6; hex 
000005d8b0d7; asc       ;; 2: len 7; hex 00000009280110; asc     (  ;; 3: len 
8; hex 0000000000000002; asc         ;; 4: len 8; hex 0000000000000002; asc     
    ;; 5: SQL NULL; 6: SQL NULL; 7: len 30; hex 
6f72672e6170616368652e636c6f7564737461636b2e6170692e636f6d6d; asc 
org.apache.cloudstack.api.comm;...(truncated); 8: len 30; hex 
7b226964223a2233222c22706879736963616c6e6574776f726b6964223a; asc 
{"id":"3","physicalnetworkid":;...(truncated); 9: len 4; hex 80000000; asc     
;; 10: len 4; hex 80000001; asc     ;; 11: len 4; hex 80000000; asc     ;; 12: 
len 4; hex 80000000; asc     ;; 13: len 30; hex 
6f72672e6170616368652e636c6f7564737461636b2e6170692e72657370; asc 
org.apache.cloudstack.api.resp;...(truncated); 14: len 8; hex 80001a6f7bb0d0a8; 
asc    o{   ;; 15: len 8; hex 80001a6f7bb0d0a8; asc    o{   ;; 16: len 8; hex 
8000124f06cfd5b6; asc    O    ;; 17: len 8; hex 8000124f06cfd5b6; asc    O    
;; 18: SQL NULL; 19: SQ
 L NULL; 20: len 30; hex 
62313065306432342d336233352d343663622d386361622d623933623562; asc 
b10e0d24-3b35-46cb-8cab-b93b5b;...(truncated); 21: len 30; hex 
39353664383563632d383336622d346663612d623738622d646238343739; asc 
956d85cc-836b-4fca-b78b-db8479;...(truncated); 22: SQL NULL; 23: len 21; hex 
4170694173796e634a6f6244697370617463686572; asc ApiAsyncJobDispatcher;; 24: SQL 
NULL; 25: len 4; hex 80000000; asc     ;;
 
        *** WE ROLL BACK TRANSACTION (2)
-*/     
+*/
        
-       //
-       // TODO
-       // ACQUIRE_GLOBAL_LOCK_TIMEOUT_FOR_SYNC is a hard-coded time out value, 
this value
-       // should actually be in sync with mysql settings
-       //
-       // TODO
-       // how to handle failures from locking?
-       
-       if(_jobDao.lockInLockTable(AsyncJob.Contants.SYNC_LOCK_NAME, 
ACQUIRE_GLOBAL_LOCK_TIMEOUT_FOR_SYNC)) {
-               try {
                        _joinMapDao.completeJoin(joinJobId, joinStatus, 
joinResult, getMsid());
-               } finally {
-                       
_jobDao.unlockFromLockTable(AsyncJob.Contants.SYNC_LOCK_NAME);
-               }
-       } else {
-               s_logger.error("If this happens, it means too bad");
-       }
     }
     
     @Override
@@ -524,7 +514,7 @@ public class AsyncJobManagerImpl extends ManagerBase 
implements AsyncJobManager,
                         s_logger.debug("Executing " + job);
                     }
 
-                    if ((getAndResetPendingSignals(job) & 
AsyncJob.Contants.SIGNAL_MASK_WAKEUP) != 0) {
+                    if ((getAndResetPendingSignals(job) & 
AsyncJob.Constants.SIGNAL_MASK_WAKEUP) != 0) {
                        AsyncJobDispatcher jobDispatcher = 
getWakeupDispatcher(job);
                        if(jobDispatcher != null) {
                                jobDispatcher.runJob(job);
@@ -709,19 +699,13 @@ public class AsyncJobManagerImpl extends ManagerBase 
implements AsyncJobManager,
                         }
                     }
               
-                       
if(_jobDao.lockInLockTable(AsyncJob.Contants.SYNC_LOCK_NAME, 
ACQUIRE_GLOBAL_LOCK_TIMEOUT_FOR_SYNC)) {
-                               try {
-                                   List<Long> standaloneWakeupJobs = 
_joinMapDao.wakeupScan();
+                    List<Long> standaloneWakeupJobs = wakeupScan();
                                    for(Long jobId : standaloneWakeupJobs) {
                                        // TODO, we assume that all jobs in 
this category is API job only
                                        AsyncJobVO job = 
_jobDao.findById(jobId);
-                                       if (job != null && 
(job.getPendingSignals() & AsyncJob.Contants.SIGNAL_MASK_WAKEUP) != 0)
+                                       if (job != null && 
(job.getPendingSignals() & AsyncJob.Constants.SIGNAL_MASK_WAKEUP) != 0)
                                            scheduleExecution(job, false);
                                    }
-                               } finally {
-                                       
_jobDao.unlockFromLockTable(AsyncJob.Contants.SYNC_LOCK_NAME);
-                               }
-                       }
                 } catch(Throwable e) {
                     s_logger.error("Unexpected exception when trying to 
execute queue item, ", e);
                 } finally {
@@ -830,6 +814,92 @@ public class AsyncJobManagerImpl extends ManagerBase 
implements AsyncJobManager,
         }
     }
 
+    @DB
+    protected List<Long> wakeupByJoinedJobCompletion(long joinedJobId) {
+        SearchCriteria<Long> joinJobSC = JoinJobSearch.create("joinJobId", 
joinedJobId);
+
+        List<Long> result = _joinMapDao.customSearch(joinJobSC, null);
+        if (result.size() != 0) {
+            Collections.sort(result);
+            Long[] ids = result.toArray(new Long[result.size()]);
+
+            SearchCriteria<AsyncJobVO> jobsSC = JobIdsSearch.create("ids", 
ids);
+            SearchCriteria<SyncQueueItemVO> queueItemsSC = 
QueueJobIdsSearch.create("contentIds", ids);
+
+            Transaction txn = Transaction.currentTxn();
+            txn.start();
+            AsyncJobVO job = _jobDao.createForUpdate();
+            job.setPendingSignals(AsyncJob.Constants.SIGNAL_MASK_WAKEUP);
+            _jobDao.update(job, jobsSC);
+
+            SyncQueueItemVO item = _queueItemDao.createForUpdate();
+            item.setLastProcessNumber(null);
+            item.setLastProcessMsid(null);
+            _queueItemDao.update(item, queueItemsSC);
+            txn.commit();
+        }
+        return _joinMapDao.findJobsToWake(joinedJobId);
+    }
+
+    @DB
+    protected List<Long> wakeupScan() {
+        List<Long> standaloneList = new ArrayList<Long>();
+
+        Date cutDate = DateUtil.currentGMTTime();
+
+        Transaction txn = Transaction.currentTxn();
+        PreparedStatement pstmt = null;
+        try {
+            txn.start();
+
+
+            //
+            // performance sensitive processing, do it in plain SQL
+            //
+            String sql = "UPDATE async_job SET job_pending_signals=? WHERE id 
IN " +
+                    "(SELECT job_id FROM async_job_join_map WHERE next_wakeup 
< ? AND expiration > ?)";
+            pstmt = txn.prepareStatement(sql);
+            pstmt.setInt(1, AsyncJob.Constants.SIGNAL_MASK_WAKEUP);
+            pstmt.setString(2, 
DateUtil.getDateDisplayString(TimeZone.getTimeZone("GMT"), cutDate));
+            pstmt.setString(3, 
DateUtil.getDateDisplayString(TimeZone.getTimeZone("GMT"), cutDate));
+            pstmt.executeUpdate();
+            pstmt.close();
+
+            sql = "UPDATE sync_queue_item SET queue_proc_msid=NULL, 
queue_proc_number=NULL WHERE content_id IN " +
+                    "(SELECT job_id FROM async_job_join_map WHERE next_wakeup 
< ? AND expiration > ?)";
+            pstmt = txn.prepareStatement(sql);
+            pstmt.setString(1, 
DateUtil.getDateDisplayString(TimeZone.getTimeZone("GMT"), cutDate));
+            pstmt.setString(2, 
DateUtil.getDateDisplayString(TimeZone.getTimeZone("GMT"), cutDate));
+            pstmt.executeUpdate();
+            pstmt.close();
+
+            sql = "SELECT job_id FROM async_job_join_map WHERE next_wakeup < ? 
AND expiration > ? AND job_id NOT IN (SELECT content_id FROM sync_queue_item)";
+            pstmt = txn.prepareStatement(sql);
+            pstmt.setString(1, 
DateUtil.getDateDisplayString(TimeZone.getTimeZone("GMT"), cutDate));
+            pstmt.setString(2, 
DateUtil.getDateDisplayString(TimeZone.getTimeZone("GMT"), cutDate));
+            ResultSet rs = pstmt.executeQuery();
+            while (rs.next()) {
+                standaloneList.add(rs.getLong(1));
+            }
+            rs.close();
+            pstmt.close();
+
+            // update for next wake-up
+            sql = "UPDATE async_job_join_map SET 
next_wakeup=DATE_ADD(next_wakeup, INTERVAL wakeup_interval SECOND) WHERE 
next_wakeup < ? AND expiration > ?";
+            pstmt = txn.prepareStatement(sql);
+            pstmt.setString(1, 
DateUtil.getDateDisplayString(TimeZone.getTimeZone("GMT"), cutDate));
+            pstmt.setString(2, 
DateUtil.getDateDisplayString(TimeZone.getTimeZone("GMT"), cutDate));
+            pstmt.executeUpdate();
+            pstmt.close();
+
+            txn.commit();
+        } catch (SQLException e) {
+            s_logger.error("Unexpected exception", e);
+        }
+
+        return standaloneList;
+    }
+
     @Override
     public boolean configure(String name, Map<String, Object> params) throws 
ConfigurationException {
         _jobExpireSeconds = 
_configDepot.get(JobExpireMinutes).setMultiplier(60);
@@ -855,6 +925,8 @@ public class AsyncJobManagerImpl extends ManagerBase 
implements AsyncJobManager,
         JoinJobSearch.selectField(JoinJobSearch.entity().getJobId());
         JoinJobSearch.done();
 
+        JoinJobTimeSearch
+
         JobIdsSearch = _jobDao.createSearchBuilder();
         JobIdsSearch.and(JobIdsSearch.entity().getId(), Op.IN, "ids").done();
 
@@ -877,34 +949,6 @@ public class AsyncJobManagerImpl extends ManagerBase 
implements AsyncJobManager,
     }
 
     @Override
-    @DB
-    public List<Long> wakeupByJoinedJobCompletion(long joinedJobId) {
-        SearchCriteria<Long> joinJobSC = JoinJobSearch.create("joinJobId", 
joinedJobId);
-
-        List<Long> result = _joinMapDao.customSearch(joinJobSC, null);
-        if (result.size() != 0) {
-            Collections.sort(result);
-            Long[] ids = result.toArray(new Long[result.size()]);
-
-            SearchCriteria<AsyncJobVO> jobsSC = JobIdsSearch.create("ids", 
ids);
-            SearchCriteria<SyncQueueItemVO> queueItemsSC = 
QueueJobIdsSearch.create("contentIds", ids);
-
-            Transaction txn = Transaction.currentTxn();
-            txn.start();
-            AsyncJobVO job = _jobDao.createForUpdate();
-            job.setPendingSignals(1);
-            _jobDao.update(job, jobsSC);
-
-            SyncQueueItemVO item = _queueItemDao.createForUpdate();
-            item.setLastProcessNumber(null);
-            item.setLastProcessMsid(null);
-            _queueItemDao.update(item, queueItemsSC);
-            txn.commit();
-        }
-        return _joinMapDao.findJobsToWake(joinedJobId);
-    }
-
-    @Override
     public void onManagementNodeJoined(List<? extends ManagementServerHost> 
nodeList, long selfNodeId) {
     }
 
@@ -960,6 +1004,7 @@ public class AsyncJobManagerImpl extends ManagerBase 
implements AsyncJobManager,
     private SearchBuilder<AsyncJobVO> JobIdsSearch;
     private SearchBuilder<SyncQueueItemVO> QueueJobIdsSearch;
     private GenericSearchBuilder<AsyncJobJoinMapVO, Long> JoinJobIdsSearch;
+    private GenericSearchBuilder<AsyncJobJoinMapVO, Long> JoinJobTimeSearch;
 
     protected AsyncJobManagerImpl() {
 

Reply via email to