Deadlock fix
Project: http://git-wip-us.apache.org/repos/asf/cloudstack/repo Commit: http://git-wip-us.apache.org/repos/asf/cloudstack/commit/0233044b Tree: http://git-wip-us.apache.org/repos/asf/cloudstack/tree/0233044b Diff: http://git-wip-us.apache.org/repos/asf/cloudstack/diff/0233044b Branch: refs/heads/vmsync Commit: 0233044b20bab0e0f7822660aa9e08bf530409ee Parents: c6ba4a2 Author: Alex Huang <[email protected]> Authored: Thu Jun 27 14:37:59 2013 -0700 Committer: Alex Huang <[email protected]> Committed: Thu Jun 27 14:37:59 2013 -0700 ---------------------------------------------------------------------- .../cloudstack/framework/jobs/AsyncJob.java | 2 +- .../framework/jobs/AsyncJobManager.java | 2 - .../jobs/dao/AsyncJobJoinMapDaoImpl.java | 2 +- .../jobs/impl/AsyncJobManagerImpl.java | 157 ++++++++++++------- 4 files changed, 103 insertions(+), 60 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/cloudstack/blob/0233044b/framework/jobs/src/org/apache/cloudstack/framework/jobs/AsyncJob.java ---------------------------------------------------------------------- diff --git a/framework/jobs/src/org/apache/cloudstack/framework/jobs/AsyncJob.java b/framework/jobs/src/org/apache/cloudstack/framework/jobs/AsyncJob.java index 2ed75a9..61fb396 100644 --- a/framework/jobs/src/org/apache/cloudstack/framework/jobs/AsyncJob.java +++ b/framework/jobs/src/org/apache/cloudstack/framework/jobs/AsyncJob.java @@ -32,7 +32,7 @@ public interface AsyncJob extends JobInfo { public static final String JOB_STATE = "job.state"; } - public static interface Contants { + public static interface Constants { // Although we may have detailed masks for each individual wakeup event, i.e. // periodical timer, matched topic from message bus, it seems that we don't http://git-wip-us.apache.org/repos/asf/cloudstack/blob/0233044b/framework/jobs/src/org/apache/cloudstack/framework/jobs/AsyncJobManager.java ---------------------------------------------------------------------- diff --git a/framework/jobs/src/org/apache/cloudstack/framework/jobs/AsyncJobManager.java b/framework/jobs/src/org/apache/cloudstack/framework/jobs/AsyncJobManager.java index 440188a..bc06101 100644 --- a/framework/jobs/src/org/apache/cloudstack/framework/jobs/AsyncJobManager.java +++ b/framework/jobs/src/org/apache/cloudstack/framework/jobs/AsyncJobManager.java @@ -37,8 +37,6 @@ public interface AsyncJobManager extends Manager { void completeAsyncJob(long jobId, JobInfo.Status jobStatus, int resultCode, String result); - List<Long> wakeupByJoinedJobCompletion(long joinedJobId); - void updateAsyncJobStatus(long jobId, int processStatus, String resultObject); void updateAsyncJobAttachment(long jobId, String instanceType, Long instanceId); void logJobJournal(long jobId, AsyncJob.JournalType journalType, String http://git-wip-us.apache.org/repos/asf/cloudstack/blob/0233044b/framework/jobs/src/org/apache/cloudstack/framework/jobs/dao/AsyncJobJoinMapDaoImpl.java ---------------------------------------------------------------------- diff --git a/framework/jobs/src/org/apache/cloudstack/framework/jobs/dao/AsyncJobJoinMapDaoImpl.java b/framework/jobs/src/org/apache/cloudstack/framework/jobs/dao/AsyncJobJoinMapDaoImpl.java index ba6cfbc..fa3b14b 100644 --- a/framework/jobs/src/org/apache/cloudstack/framework/jobs/dao/AsyncJobJoinMapDaoImpl.java +++ b/framework/jobs/src/org/apache/cloudstack/framework/jobs/dao/AsyncJobJoinMapDaoImpl.java @@ -169,7 +169,7 @@ public class AsyncJobJoinMapDaoImpl extends GenericDaoBase<AsyncJobJoinMapVO, Lo String sql = "UPDATE async_job SET job_pending_signals=? WHERE id IN " + "(SELECT job_id FROM async_job_join_map WHERE next_wakeup < ? AND expiration > ?)"; pstmt = txn.prepareStatement(sql); - pstmt.setInt(1, AsyncJob.Contants.SIGNAL_MASK_WAKEUP); + pstmt.setInt(1, AsyncJob.Constants.SIGNAL_MASK_WAKEUP); pstmt.setString(2, DateUtil.getDateDisplayString(TimeZone.getTimeZone("GMT"), cutDate)); pstmt.setString(3, DateUtil.getDateDisplayString(TimeZone.getTimeZone("GMT"), cutDate)); pstmt.executeUpdate(); http://git-wip-us.apache.org/repos/asf/cloudstack/blob/0233044b/framework/jobs/src/org/apache/cloudstack/framework/jobs/impl/AsyncJobManagerImpl.java ---------------------------------------------------------------------- diff --git a/framework/jobs/src/org/apache/cloudstack/framework/jobs/impl/AsyncJobManagerImpl.java b/framework/jobs/src/org/apache/cloudstack/framework/jobs/impl/AsyncJobManagerImpl.java index 2351a10..9b6aa97 100644 --- a/framework/jobs/src/org/apache/cloudstack/framework/jobs/impl/AsyncJobManagerImpl.java +++ b/framework/jobs/src/org/apache/cloudstack/framework/jobs/impl/AsyncJobManagerImpl.java @@ -19,12 +19,17 @@ package org.apache.cloudstack.framework.jobs.impl; import java.io.File; import java.io.FileInputStream; +import java.sql.PreparedStatement; +import java.sql.ResultSet; +import java.sql.SQLException; +import java.util.ArrayList; import java.util.Arrays; import java.util.Date; import java.util.List; import java.util.Map; import java.util.Properties; import java.util.Random; +import java.util.TimeZone; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; import java.util.concurrent.RejectedExecutionException; @@ -241,7 +246,7 @@ public class AsyncJobManagerImpl extends ManagerBase implements AsyncJobManager, for(Long id : wakeupList) { // TODO, we assume that all jobs in this category is API job only AsyncJobVO jobToWakeup = _jobDao.findById(id); - if (jobToWakeup != null && (jobToWakeup.getPendingSignals() & AsyncJob.Contants.SIGNAL_MASK_WAKEUP) != 0) + if (jobToWakeup != null && (jobToWakeup.getPendingSignals() & AsyncJob.Constants.SIGNAL_MASK_WAKEUP) != 0) scheduleExecution(jobToWakeup, false); } @@ -353,8 +358,9 @@ public class AsyncJobManagerImpl extends ManagerBase implements AsyncJobManager, // TODO // this is a temporary solution to solve strange MySQL deadlock issue, // completeJoin() causes deadlock happens at async_job table + // I removed the temporary solution already. I think my changes should fix the deadlock. -/* +/* ------------------------ LATEST DETECTED DEADLOCK ------------------------ @@ -387,25 +393,9 @@ public class AsyncJobManagerImpl extends ManagerBase implements AsyncJobManager, 0: len 8; hex 0000000000000009; asc ;; 1: len 6; hex 000005d8b0d7; asc ;; 2: len 7; hex 00000009280110; asc ( ;; 3: len 8; hex 0000000000000002; asc ;; 4: len 8; hex 0000000000000002; asc ;; 5: SQL NULL; 6: SQL NULL; 7: len 30; hex 6f72672e6170616368652e636c6f7564737461636b2e6170692e636f6d6d; asc org.apache.cloudstack.api.comm;...(truncated); 8: len 30; hex 7b226964223a2233222c22706879736963616c6e6574776f726b6964223a; asc {"id":"3","physicalnetworkid":;...(truncated); 9: len 4; hex 80000000; asc ;; 10: len 4; hex 80000001; asc ;; 11: len 4; hex 80000000; asc ;; 12: len 4; hex 80000000; asc ;; 13: len 30; hex 6f72672e6170616368652e636c6f7564737461636b2e6170692e72657370; asc org.apache.cloudstack.api.resp;...(truncated); 14: len 8; hex 80001a6f7bb0d0a8; asc o{ ;; 15: len 8; hex 80001a6f7bb0d0a8; asc o{ ;; 16: len 8; hex 8000124f06cfd5b6; asc O ;; 17: len 8; hex 8000124f06cfd5b6; asc O ;; 18: SQL NULL; 19: SQ L NULL; 20: len 30; hex 62313065306432342d336233352d343663622d386361622d623933623562; asc b10e0d24-3b35-46cb-8cab-b93b5b;...(truncated); 21: len 30; hex 39353664383563632d383336622d346663612d623738622d646238343739; asc 956d85cc-836b-4fca-b78b-db8479;...(truncated); 22: SQL NULL; 23: len 21; hex 4170694173796e634a6f6244697370617463686572; asc ApiAsyncJobDispatcher;; 24: SQL NULL; 25: len 4; hex 80000000; asc ;; *** WE ROLL BACK TRANSACTION (2) -*/ +*/ - // - // TODO - // ACQUIRE_GLOBAL_LOCK_TIMEOUT_FOR_SYNC is a hard-coded time out value, this value - // should actually be in sync with mysql settings - // - // TODO - // how to handle failures from locking? - - if(_jobDao.lockInLockTable(AsyncJob.Contants.SYNC_LOCK_NAME, ACQUIRE_GLOBAL_LOCK_TIMEOUT_FOR_SYNC)) { - try { _joinMapDao.completeJoin(joinJobId, joinStatus, joinResult, getMsid()); - } finally { - _jobDao.unlockFromLockTable(AsyncJob.Contants.SYNC_LOCK_NAME); - } - } else { - s_logger.error("If this happens, it means too bad"); - } } @Override @@ -524,7 +514,7 @@ public class AsyncJobManagerImpl extends ManagerBase implements AsyncJobManager, s_logger.debug("Executing " + job); } - if ((getAndResetPendingSignals(job) & AsyncJob.Contants.SIGNAL_MASK_WAKEUP) != 0) { + if ((getAndResetPendingSignals(job) & AsyncJob.Constants.SIGNAL_MASK_WAKEUP) != 0) { AsyncJobDispatcher jobDispatcher = getWakeupDispatcher(job); if(jobDispatcher != null) { jobDispatcher.runJob(job); @@ -709,19 +699,13 @@ public class AsyncJobManagerImpl extends ManagerBase implements AsyncJobManager, } } - if(_jobDao.lockInLockTable(AsyncJob.Contants.SYNC_LOCK_NAME, ACQUIRE_GLOBAL_LOCK_TIMEOUT_FOR_SYNC)) { - try { - List<Long> standaloneWakeupJobs = _joinMapDao.wakeupScan(); + List<Long> standaloneWakeupJobs = wakeupScan(); for(Long jobId : standaloneWakeupJobs) { // TODO, we assume that all jobs in this category is API job only AsyncJobVO job = _jobDao.findById(jobId); - if (job != null && (job.getPendingSignals() & AsyncJob.Contants.SIGNAL_MASK_WAKEUP) != 0) + if (job != null && (job.getPendingSignals() & AsyncJob.Constants.SIGNAL_MASK_WAKEUP) != 0) scheduleExecution(job, false); } - } finally { - _jobDao.unlockFromLockTable(AsyncJob.Contants.SYNC_LOCK_NAME); - } - } } catch(Throwable e) { s_logger.error("Unexpected exception when trying to execute queue item, ", e); } finally { @@ -830,6 +814,92 @@ public class AsyncJobManagerImpl extends ManagerBase implements AsyncJobManager, } } + @DB + protected List<Long> wakeupByJoinedJobCompletion(long joinedJobId) { + SearchCriteria<Long> joinJobSC = JoinJobSearch.create("joinJobId", joinedJobId); + + List<Long> result = _joinMapDao.customSearch(joinJobSC, null); + if (result.size() != 0) { + Collections.sort(result); + Long[] ids = result.toArray(new Long[result.size()]); + + SearchCriteria<AsyncJobVO> jobsSC = JobIdsSearch.create("ids", ids); + SearchCriteria<SyncQueueItemVO> queueItemsSC = QueueJobIdsSearch.create("contentIds", ids); + + Transaction txn = Transaction.currentTxn(); + txn.start(); + AsyncJobVO job = _jobDao.createForUpdate(); + job.setPendingSignals(AsyncJob.Constants.SIGNAL_MASK_WAKEUP); + _jobDao.update(job, jobsSC); + + SyncQueueItemVO item = _queueItemDao.createForUpdate(); + item.setLastProcessNumber(null); + item.setLastProcessMsid(null); + _queueItemDao.update(item, queueItemsSC); + txn.commit(); + } + return _joinMapDao.findJobsToWake(joinedJobId); + } + + @DB + protected List<Long> wakeupScan() { + List<Long> standaloneList = new ArrayList<Long>(); + + Date cutDate = DateUtil.currentGMTTime(); + + Transaction txn = Transaction.currentTxn(); + PreparedStatement pstmt = null; + try { + txn.start(); + + + // + // performance sensitive processing, do it in plain SQL + // + String sql = "UPDATE async_job SET job_pending_signals=? WHERE id IN " + + "(SELECT job_id FROM async_job_join_map WHERE next_wakeup < ? AND expiration > ?)"; + pstmt = txn.prepareStatement(sql); + pstmt.setInt(1, AsyncJob.Constants.SIGNAL_MASK_WAKEUP); + pstmt.setString(2, DateUtil.getDateDisplayString(TimeZone.getTimeZone("GMT"), cutDate)); + pstmt.setString(3, DateUtil.getDateDisplayString(TimeZone.getTimeZone("GMT"), cutDate)); + pstmt.executeUpdate(); + pstmt.close(); + + sql = "UPDATE sync_queue_item SET queue_proc_msid=NULL, queue_proc_number=NULL WHERE content_id IN " + + "(SELECT job_id FROM async_job_join_map WHERE next_wakeup < ? AND expiration > ?)"; + pstmt = txn.prepareStatement(sql); + pstmt.setString(1, DateUtil.getDateDisplayString(TimeZone.getTimeZone("GMT"), cutDate)); + pstmt.setString(2, DateUtil.getDateDisplayString(TimeZone.getTimeZone("GMT"), cutDate)); + pstmt.executeUpdate(); + pstmt.close(); + + sql = "SELECT job_id FROM async_job_join_map WHERE next_wakeup < ? AND expiration > ? AND job_id NOT IN (SELECT content_id FROM sync_queue_item)"; + pstmt = txn.prepareStatement(sql); + pstmt.setString(1, DateUtil.getDateDisplayString(TimeZone.getTimeZone("GMT"), cutDate)); + pstmt.setString(2, DateUtil.getDateDisplayString(TimeZone.getTimeZone("GMT"), cutDate)); + ResultSet rs = pstmt.executeQuery(); + while (rs.next()) { + standaloneList.add(rs.getLong(1)); + } + rs.close(); + pstmt.close(); + + // update for next wake-up + sql = "UPDATE async_job_join_map SET next_wakeup=DATE_ADD(next_wakeup, INTERVAL wakeup_interval SECOND) WHERE next_wakeup < ? AND expiration > ?"; + pstmt = txn.prepareStatement(sql); + pstmt.setString(1, DateUtil.getDateDisplayString(TimeZone.getTimeZone("GMT"), cutDate)); + pstmt.setString(2, DateUtil.getDateDisplayString(TimeZone.getTimeZone("GMT"), cutDate)); + pstmt.executeUpdate(); + pstmt.close(); + + txn.commit(); + } catch (SQLException e) { + s_logger.error("Unexpected exception", e); + } + + return standaloneList; + } + @Override public boolean configure(String name, Map<String, Object> params) throws ConfigurationException { _jobExpireSeconds = _configDepot.get(JobExpireMinutes).setMultiplier(60); @@ -855,6 +925,8 @@ public class AsyncJobManagerImpl extends ManagerBase implements AsyncJobManager, JoinJobSearch.selectField(JoinJobSearch.entity().getJobId()); JoinJobSearch.done(); + JoinJobTimeSearch + JobIdsSearch = _jobDao.createSearchBuilder(); JobIdsSearch.and(JobIdsSearch.entity().getId(), Op.IN, "ids").done(); @@ -877,34 +949,6 @@ public class AsyncJobManagerImpl extends ManagerBase implements AsyncJobManager, } @Override - @DB - public List<Long> wakeupByJoinedJobCompletion(long joinedJobId) { - SearchCriteria<Long> joinJobSC = JoinJobSearch.create("joinJobId", joinedJobId); - - List<Long> result = _joinMapDao.customSearch(joinJobSC, null); - if (result.size() != 0) { - Collections.sort(result); - Long[] ids = result.toArray(new Long[result.size()]); - - SearchCriteria<AsyncJobVO> jobsSC = JobIdsSearch.create("ids", ids); - SearchCriteria<SyncQueueItemVO> queueItemsSC = QueueJobIdsSearch.create("contentIds", ids); - - Transaction txn = Transaction.currentTxn(); - txn.start(); - AsyncJobVO job = _jobDao.createForUpdate(); - job.setPendingSignals(1); - _jobDao.update(job, jobsSC); - - SyncQueueItemVO item = _queueItemDao.createForUpdate(); - item.setLastProcessNumber(null); - item.setLastProcessMsid(null); - _queueItemDao.update(item, queueItemsSC); - txn.commit(); - } - return _joinMapDao.findJobsToWake(joinedJobId); - } - - @Override public void onManagementNodeJoined(List<? extends ManagementServerHost> nodeList, long selfNodeId) { } @@ -960,6 +1004,7 @@ public class AsyncJobManagerImpl extends ManagerBase implements AsyncJobManager, private SearchBuilder<AsyncJobVO> JobIdsSearch; private SearchBuilder<SyncQueueItemVO> QueueJobIdsSearch; private GenericSearchBuilder<AsyncJobJoinMapVO, Long> JoinJobIdsSearch; + private GenericSearchBuilder<AsyncJobJoinMapVO, Long> JoinJobTimeSearch; protected AsyncJobManagerImpl() {
