Updated Branches: refs/heads/vmsync 1f0186aaf -> fc0713fd5
Replace hard-coded job wakeup signal constants Project: http://git-wip-us.apache.org/repos/asf/cloudstack/repo Commit: http://git-wip-us.apache.org/repos/asf/cloudstack/commit/fc0713fd Tree: http://git-wip-us.apache.org/repos/asf/cloudstack/tree/fc0713fd Diff: http://git-wip-us.apache.org/repos/asf/cloudstack/diff/fc0713fd Branch: refs/heads/vmsync Commit: fc0713fd5503d395632e240c2e047afe954fc630 Parents: 1f0186a Author: Kelven Yang <[email protected]> Authored: Wed Jun 26 13:45:09 2013 -0700 Committer: Kelven Yang <[email protected]> Committed: Wed Jun 26 13:57:03 2013 -0700 ---------------------------------------------------------------------- .../org/apache/cloudstack/framework/jobs/AsyncJob.java | 9 +++++++++ .../framework/jobs/dao/AsyncJobJoinMapDaoImpl.java | 11 +++++++---- .../framework/jobs/impl/AsyncJobManagerImpl.java | 11 +++-------- 3 files changed, 19 insertions(+), 12 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/cloudstack/blob/fc0713fd/framework/jobs/src/org/apache/cloudstack/framework/jobs/AsyncJob.java ---------------------------------------------------------------------- diff --git a/framework/jobs/src/org/apache/cloudstack/framework/jobs/AsyncJob.java b/framework/jobs/src/org/apache/cloudstack/framework/jobs/AsyncJob.java index be92846..995eaaf 100644 --- a/framework/jobs/src/org/apache/cloudstack/framework/jobs/AsyncJob.java +++ b/framework/jobs/src/org/apache/cloudstack/framework/jobs/AsyncJob.java @@ -31,6 +31,15 @@ public interface AsyncJob extends JobInfo { public static final String JOB_HEARTBEAT = "job.heartbeat"; public static final String JOB_STATE = "job.state"; } + + public static interface Contants { + + // Although we may have detailed masks for each individual wakeup event, i.e. + // periodical timer, matched topic from message bus, it seems that we don't + // need to distinguish them to such level. Therefore, only one wakeup signal + // is defined + public static final int SIGNAL_MASK_WAKEUP = 1; + } @Override String getType(); http://git-wip-us.apache.org/repos/asf/cloudstack/blob/fc0713fd/framework/jobs/src/org/apache/cloudstack/framework/jobs/dao/AsyncJobJoinMapDaoImpl.java ---------------------------------------------------------------------- diff --git a/framework/jobs/src/org/apache/cloudstack/framework/jobs/dao/AsyncJobJoinMapDaoImpl.java b/framework/jobs/src/org/apache/cloudstack/framework/jobs/dao/AsyncJobJoinMapDaoImpl.java index 60dea03..8ea5073 100644 --- a/framework/jobs/src/org/apache/cloudstack/framework/jobs/dao/AsyncJobJoinMapDaoImpl.java +++ b/framework/jobs/src/org/apache/cloudstack/framework/jobs/dao/AsyncJobJoinMapDaoImpl.java @@ -26,6 +26,7 @@ import java.util.TimeZone; import org.apache.log4j.Logger; +import org.apache.cloudstack.framework.jobs.AsyncJob; import org.apache.cloudstack.framework.jobs.impl.AsyncJobJoinMapVO; import org.apache.cloudstack.jobs.JobInfo; @@ -158,11 +159,12 @@ public class AsyncJobJoinMapDaoImpl extends GenericDaoBase<AsyncJobJoinMapVO, Lo // // performance sensitive processing, do it in plain SQL // - String sql = "UPDATE async_job SET job_pending_signals=1 WHERE id IN " + + String sql = "UPDATE async_job SET job_pending_signals=? WHERE id IN " + "(SELECT job_id FROM async_job_join_map WHERE next_wakeup < ? AND expiration > ?)"; pstmt = txn.prepareStatement(sql); - pstmt.setString(1, DateUtil.getDateDisplayString(TimeZone.getTimeZone("GMT"), cutDate)); + pstmt.setInt(1, AsyncJob.Contants.SIGNAL_MASK_WAKEUP); pstmt.setString(2, DateUtil.getDateDisplayString(TimeZone.getTimeZone("GMT"), cutDate)); + pstmt.setString(3, DateUtil.getDateDisplayString(TimeZone.getTimeZone("GMT"), cutDate)); pstmt.executeUpdate(); pstmt.close(); @@ -213,10 +215,11 @@ public class AsyncJobJoinMapDaoImpl extends GenericDaoBase<AsyncJobJoinMapVO, Lo // // performance sensitive processing, do it in plain SQL // - String sql = "UPDATE async_job SET job_pending_signals=1 WHERE id IN " + + String sql = "UPDATE async_job SET job_pending_signals=? WHERE id IN " + "(SELECT job_id FROM async_job_join_map WHERE join_job_id = ?)"; pstmt = txn.prepareStatement(sql); - pstmt.setLong(1, joinedJobId); + pstmt.setInt(1, AsyncJob.Contants.SIGNAL_MASK_WAKEUP); + pstmt.setLong(2, joinedJobId); pstmt.executeUpdate(); pstmt.close(); http://git-wip-us.apache.org/repos/asf/cloudstack/blob/fc0713fd/framework/jobs/src/org/apache/cloudstack/framework/jobs/impl/AsyncJobManagerImpl.java ---------------------------------------------------------------------- diff --git a/framework/jobs/src/org/apache/cloudstack/framework/jobs/impl/AsyncJobManagerImpl.java b/framework/jobs/src/org/apache/cloudstack/framework/jobs/impl/AsyncJobManagerImpl.java index f65e25e..7b199ff 100644 --- a/framework/jobs/src/org/apache/cloudstack/framework/jobs/impl/AsyncJobManagerImpl.java +++ b/framework/jobs/src/org/apache/cloudstack/framework/jobs/impl/AsyncJobManagerImpl.java @@ -80,11 +80,6 @@ public class AsyncJobManagerImpl extends ManagerBase implements AsyncJobManager, private static final Logger s_logger = Logger.getLogger(AsyncJobManagerImpl.class); private static final int ACQUIRE_GLOBAL_LOCK_TIMEOUT_FOR_COOPERATION = 3; // 3 seconds - // Although we may have detailed masks for each individual wakeup event, i.e. - // periodical timer, matched topic from message bus, it seems that we don't - // need to distinguish them to such level. Therefore, only one wakeup signal - // is defined - public static final int SIGNAL_MASK_WAKEUP = 1; private static final int MAX_ONETIME_SCHEDULE_SIZE = 50; private static final int HEARTBEAT_INTERVAL = 2000; @@ -235,7 +230,7 @@ public class AsyncJobManagerImpl extends ManagerBase implements AsyncJobManager, for(Long id : wakeupList) { // TODO, we assume that all jobs in this category is API job only AsyncJobVO jobToWakeup = _jobDao.findById(id); - if (jobToWakeup != null && (jobToWakeup.getPendingSignals() & SIGNAL_MASK_WAKEUP) != 0) + if (jobToWakeup != null && (jobToWakeup.getPendingSignals() & AsyncJob.Contants.SIGNAL_MASK_WAKEUP) != 0) scheduleExecution(jobToWakeup, false); } @@ -462,7 +457,7 @@ public class AsyncJobManagerImpl extends ManagerBase implements AsyncJobManager, s_logger.debug("Executing " + job); } - if ((getAndResetPendingSignals(job) & SIGNAL_MASK_WAKEUP) != 0) { + if ((getAndResetPendingSignals(job) & AsyncJob.Contants.SIGNAL_MASK_WAKEUP) != 0) { AsyncJobDispatcher jobDispatcher = getWakeupDispatcher(job); if(jobDispatcher != null) { jobDispatcher.runJob(job); @@ -651,7 +646,7 @@ public class AsyncJobManagerImpl extends ManagerBase implements AsyncJobManager, for(Long jobId : standaloneWakeupJobs) { // TODO, we assume that all jobs in this category is API job only AsyncJobVO job = _jobDao.findById(jobId); - if (job != null && (job.getPendingSignals() & SIGNAL_MASK_WAKEUP) != 0) + if (job != null && (job.getPendingSignals() & AsyncJob.Contants.SIGNAL_MASK_WAKEUP) != 0) scheduleExecution(job, false); } } catch(Throwable e) {
