Updated Branches: refs/heads/vmsync 2210c1027 -> c7530dbd7
Hook job monitoring Project: http://git-wip-us.apache.org/repos/asf/cloudstack/repo Commit: http://git-wip-us.apache.org/repos/asf/cloudstack/commit/c7530dbd Tree: http://git-wip-us.apache.org/repos/asf/cloudstack/tree/c7530dbd Diff: http://git-wip-us.apache.org/repos/asf/cloudstack/diff/c7530dbd Branch: refs/heads/vmsync Commit: c7530dbd70b256472c834158d4cbcf45d6235c50 Parents: 2210c10 Author: Kelven Yang <kelv...@gmail.com> Authored: Sun May 12 18:15:47 2013 -0700 Committer: Kelven Yang <kelv...@gmail.com> Committed: Sun May 12 18:15:47 2013 -0700 ---------------------------------------------------------------------- server/src/com/cloud/async/AsyncJobConstants.java | 2 ++ .../src/com/cloud/async/AsyncJobManagerImpl.java | 14 ++++++++++---- server/src/com/cloud/async/AsyncJobMonitor.java | 4 +++- .../test/com/cloud/vm/VmWorkTestConfiguration.java | 6 ++++++ 4 files changed, 21 insertions(+), 5 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/cloudstack/blob/c7530dbd/server/src/com/cloud/async/AsyncJobConstants.java ---------------------------------------------------------------------- diff --git a/server/src/com/cloud/async/AsyncJobConstants.java b/server/src/com/cloud/async/AsyncJobConstants.java index bbebad6..6081d0c 100644 --- a/server/src/com/cloud/async/AsyncJobConstants.java +++ b/server/src/com/cloud/async/AsyncJobConstants.java @@ -24,6 +24,8 @@ public interface AsyncJobConstants { public static final String JOB_DISPATCHER_PSEUDO = "pseudoJobDispatcher"; public static final String PSEUDO_JOB_INSTANCE_TYPE = "Thread"; + public static final String JOB_POOL_THREAD_PREFIX = "Job-Executor"; + // Although we may have detailed masks for each individual wakeup event, i.e. // periodical timer, matched topic from message bus, it seems that we don't // need to distinguish them to such level. Therefore, only one wakeup signal http://git-wip-us.apache.org/repos/asf/cloudstack/blob/c7530dbd/server/src/com/cloud/async/AsyncJobManagerImpl.java ---------------------------------------------------------------------- diff --git a/server/src/com/cloud/async/AsyncJobManagerImpl.java b/server/src/com/cloud/async/AsyncJobManagerImpl.java index f23f3d7..cde184a 100644 --- a/server/src/com/cloud/async/AsyncJobManagerImpl.java +++ b/server/src/com/cloud/async/AsyncJobManagerImpl.java @@ -92,6 +92,7 @@ public class AsyncJobManagerImpl extends ManagerBase implements AsyncJobManager, @Inject private AsyncJobJoinMapDao _joinMapDao; @Inject private List<AsyncJobDispatcher> _jobDispatchers; @Inject private MessageBus _messageBus; + @Inject private AsyncJobMonitor _jobMonitor; // property private String defaultDispatcher; @@ -524,6 +525,7 @@ public class AsyncJobManagerImpl extends ManagerBase implements AsyncJobManager, s_logger.warn("Unable to register active job " + job.getId() + " to JMX monitoring due to exception " + ExceptionUtil.toString(e)); } + _jobMonitor.registerActiveTask(job.getId()); AsyncJobExecutionContext.setCurrentExecutionContext( (AsyncJobExecutionContext)ComponentContext.inject(new AsyncJobExecutionContext(job)) ); @@ -560,6 +562,10 @@ public class AsyncJobManagerImpl extends ManagerBase implements AsyncJobManager, } finally { // guard final clause as well try { + AsyncJobVO jobToUpdate = _jobDao.findById(job.getId()); + jobToUpdate.setExecutingMsid(null); + _jobDao.update(job.getId(), jobToUpdate); + if (job.getSyncSource() != null) { _queueMgr.purgeItem(job.getSyncSource().getId()); checkQueue(job.getSyncSource().getQueueId()); @@ -569,6 +575,7 @@ public class AsyncJobManagerImpl extends ManagerBase implements AsyncJobManager, // clean execution environment // AsyncJobExecutionContext.setCurrentExecutionContext(null); + _jobMonitor.unregisterActiveTask(job.getId()); try { JmxUtil.unregisterMBean("AsyncJobManager", "Active Job " + job.getId()); @@ -608,7 +615,6 @@ public class AsyncJobManagerImpl extends ManagerBase implements AsyncJobManager, job.setSyncSource(item); job.setExecutingMsid(getMsid()); - job.setCompleteMsid(getMsid()); _jobDao.update(job.getId(), job); try { @@ -616,10 +622,10 @@ public class AsyncJobManagerImpl extends ManagerBase implements AsyncJobManager, } catch(RejectedExecutionException e) { s_logger.warn("Execution for job-" + job.getId() + " is rejected, return it to the queue for next turn"); _queueMgr.returnItem(item.getId()); - } finally { + job.setExecutingMsid(null); _jobDao.update(job.getId(), job); - } + } } else { if(s_logger.isDebugEnabled()) { @@ -838,7 +844,7 @@ public class AsyncJobManagerImpl extends ManagerBase implements AsyncJobManager, int poolSize = (cloudMaxActive * 2) / 3; s_logger.info("Start AsyncJobManager thread pool in size " + poolSize); - _executor = Executors.newFixedThreadPool(poolSize, new NamedThreadFactory("Job-Executor")); + _executor = Executors.newFixedThreadPool(poolSize, new NamedThreadFactory(AsyncJobConstants.JOB_POOL_THREAD_PREFIX)); } catch (final Exception e) { throw new ConfigurationException("Unable to load db.properties to configure AsyncJobManagerImpl"); } http://git-wip-us.apache.org/repos/asf/cloudstack/blob/c7530dbd/server/src/com/cloud/async/AsyncJobMonitor.java ---------------------------------------------------------------------- diff --git a/server/src/com/cloud/async/AsyncJobMonitor.java b/server/src/com/cloud/async/AsyncJobMonitor.java index bd5b2cd..98c340b 100644 --- a/server/src/com/cloud/async/AsyncJobMonitor.java +++ b/server/src/com/cloud/async/AsyncJobMonitor.java @@ -105,10 +105,12 @@ public class AsyncJobMonitor extends ManagerBase { return true; } - public void registerActiveTask(long jobId, long threadId, boolean fromPoolThread) { + public void registerActiveTask(long jobId) { synchronized(this) { assert(_activeTasks.get(jobId) == null); + long threadId = Thread.currentThread().getId(); + boolean fromPoolThread = Thread.currentThread().getName().contains(AsyncJobConstants.JOB_POOL_THREAD_PREFIX); ActiveTaskRecord record = new ActiveTaskRecord(threadId, jobId, fromPoolThread); _activeTasks.put(jobId, record); if(fromPoolThread) http://git-wip-us.apache.org/repos/asf/cloudstack/blob/c7530dbd/server/test/com/cloud/vm/VmWorkTestConfiguration.java ---------------------------------------------------------------------- diff --git a/server/test/com/cloud/vm/VmWorkTestConfiguration.java b/server/test/com/cloud/vm/VmWorkTestConfiguration.java index 67a3c00..93aa41a 100644 --- a/server/test/com/cloud/vm/VmWorkTestConfiguration.java +++ b/server/test/com/cloud/vm/VmWorkTestConfiguration.java @@ -21,6 +21,7 @@ import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; import com.cloud.api.ApiDispatcher; +import com.cloud.async.AsyncJobMonitor; import com.cloud.async.SyncQueueManager; import com.cloud.async.SyncQueueManagerImpl; import com.cloud.async.dao.AsyncJobDao; @@ -123,4 +124,9 @@ public class VmWorkTestConfiguration { public VMInstanceDao vmInstanceDao() { return Mockito.mock(VMInstanceDao.class); } + + @Bean + public AsyncJobMonitor jobMonitor() { + return Mockito.mock(AsyncJobMonitor.class); + } }