Updated Branches:
  refs/heads/vmsync 2210c1027 -> c7530dbd7

Hook job monitoring


Project: http://git-wip-us.apache.org/repos/asf/cloudstack/repo
Commit: http://git-wip-us.apache.org/repos/asf/cloudstack/commit/c7530dbd
Tree: http://git-wip-us.apache.org/repos/asf/cloudstack/tree/c7530dbd
Diff: http://git-wip-us.apache.org/repos/asf/cloudstack/diff/c7530dbd

Branch: refs/heads/vmsync
Commit: c7530dbd70b256472c834158d4cbcf45d6235c50
Parents: 2210c10
Author: Kelven Yang <kelv...@gmail.com>
Authored: Sun May 12 18:15:47 2013 -0700
Committer: Kelven Yang <kelv...@gmail.com>
Committed: Sun May 12 18:15:47 2013 -0700

----------------------------------------------------------------------
 server/src/com/cloud/async/AsyncJobConstants.java  |    2 ++
 .../src/com/cloud/async/AsyncJobManagerImpl.java   |   14 ++++++++++----
 server/src/com/cloud/async/AsyncJobMonitor.java    |    4 +++-
 .../test/com/cloud/vm/VmWorkTestConfiguration.java |    6 ++++++
 4 files changed, 21 insertions(+), 5 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/cloudstack/blob/c7530dbd/server/src/com/cloud/async/AsyncJobConstants.java
----------------------------------------------------------------------
diff --git a/server/src/com/cloud/async/AsyncJobConstants.java 
b/server/src/com/cloud/async/AsyncJobConstants.java
index bbebad6..6081d0c 100644
--- a/server/src/com/cloud/async/AsyncJobConstants.java
+++ b/server/src/com/cloud/async/AsyncJobConstants.java
@@ -24,6 +24,8 @@ public interface AsyncJobConstants {
        public static final String JOB_DISPATCHER_PSEUDO = 
"pseudoJobDispatcher";
        public static final String PSEUDO_JOB_INSTANCE_TYPE = "Thread";
        
+       public static final String JOB_POOL_THREAD_PREFIX = "Job-Executor";
+       
        // Although we may have detailed masks for each individual wakeup 
event, i.e.
        // periodical timer, matched topic from message bus, it seems that we 
don't
        // need to distinguish them to such level. Therefore, only one wakeup 
signal

http://git-wip-us.apache.org/repos/asf/cloudstack/blob/c7530dbd/server/src/com/cloud/async/AsyncJobManagerImpl.java
----------------------------------------------------------------------
diff --git a/server/src/com/cloud/async/AsyncJobManagerImpl.java 
b/server/src/com/cloud/async/AsyncJobManagerImpl.java
index f23f3d7..cde184a 100644
--- a/server/src/com/cloud/async/AsyncJobManagerImpl.java
+++ b/server/src/com/cloud/async/AsyncJobManagerImpl.java
@@ -92,6 +92,7 @@ public class AsyncJobManagerImpl extends ManagerBase 
implements AsyncJobManager,
     @Inject private AsyncJobJoinMapDao _joinMapDao;
     @Inject private List<AsyncJobDispatcher> _jobDispatchers;
     @Inject private MessageBus _messageBus;
+    @Inject private AsyncJobMonitor _jobMonitor;
 
     // property
     private String defaultDispatcher;
@@ -524,6 +525,7 @@ public class AsyncJobManagerImpl extends ManagerBase 
implements AsyncJobManager,
                         s_logger.warn("Unable to register active job " + 
job.getId() + " to JMX monitoring due to exception " + 
ExceptionUtil.toString(e));
                     }
                     
+                    _jobMonitor.registerActiveTask(job.getId());
                     AsyncJobExecutionContext.setCurrentExecutionContext(
                        (AsyncJobExecutionContext)ComponentContext.inject(new 
AsyncJobExecutionContext(job))
                     );
@@ -560,6 +562,10 @@ public class AsyncJobManagerImpl extends ManagerBase 
implements AsyncJobManager,
                } finally {
                        // guard final clause as well
                     try {
+                       AsyncJobVO jobToUpdate = _jobDao.findById(job.getId());
+                       jobToUpdate.setExecutingMsid(null);
+                       _jobDao.update(job.getId(), jobToUpdate);
+                       
                        if (job.getSyncSource() != null) {
                             _queueMgr.purgeItem(job.getSyncSource().getId());
                             checkQueue(job.getSyncSource().getQueueId());
@@ -569,6 +575,7 @@ public class AsyncJobManagerImpl extends ManagerBase 
implements AsyncJobManager,
                        // clean execution environment
                        //
                         
AsyncJobExecutionContext.setCurrentExecutionContext(null);
+                        _jobMonitor.unregisterActiveTask(job.getId());
                        
                        try {
                                JmxUtil.unregisterMBean("AsyncJobManager", 
"Active Job " + job.getId());
@@ -608,7 +615,6 @@ public class AsyncJobManagerImpl extends ManagerBase 
implements AsyncJobManager,
             job.setSyncSource(item);
             
             job.setExecutingMsid(getMsid());
-            job.setCompleteMsid(getMsid());
             _jobDao.update(job.getId(), job);
 
             try {
@@ -616,10 +622,10 @@ public class AsyncJobManagerImpl extends ManagerBase 
implements AsyncJobManager,
             } catch(RejectedExecutionException e) {
                 s_logger.warn("Execution for job-" + job.getId() + " is 
rejected, return it to the queue for next turn");
                 _queueMgr.returnItem(item.getId());
-            } finally {
+                
                job.setExecutingMsid(null);
                _jobDao.update(job.getId(), job);
-            }
+            } 
 
         } else {
             if(s_logger.isDebugEnabled()) {
@@ -838,7 +844,7 @@ public class AsyncJobManagerImpl extends ManagerBase 
implements AsyncJobManager,
             int poolSize = (cloudMaxActive * 2) / 3;
 
             s_logger.info("Start AsyncJobManager thread pool in size " + 
poolSize);
-            _executor = Executors.newFixedThreadPool(poolSize, new 
NamedThreadFactory("Job-Executor"));
+            _executor = Executors.newFixedThreadPool(poolSize, new 
NamedThreadFactory(AsyncJobConstants.JOB_POOL_THREAD_PREFIX));
         } catch (final Exception e) {
             throw new ConfigurationException("Unable to load db.properties to 
configure AsyncJobManagerImpl");
         }

http://git-wip-us.apache.org/repos/asf/cloudstack/blob/c7530dbd/server/src/com/cloud/async/AsyncJobMonitor.java
----------------------------------------------------------------------
diff --git a/server/src/com/cloud/async/AsyncJobMonitor.java 
b/server/src/com/cloud/async/AsyncJobMonitor.java
index bd5b2cd..98c340b 100644
--- a/server/src/com/cloud/async/AsyncJobMonitor.java
+++ b/server/src/com/cloud/async/AsyncJobMonitor.java
@@ -105,10 +105,12 @@ public class AsyncJobMonitor extends ManagerBase {
                return true;
        }
        
-       public void registerActiveTask(long jobId, long threadId, boolean 
fromPoolThread) {
+       public void registerActiveTask(long jobId) {
                synchronized(this) {
                        assert(_activeTasks.get(jobId) == null);
                        
+                       long threadId = Thread.currentThread().getId();
+                       boolean fromPoolThread = 
Thread.currentThread().getName().contains(AsyncJobConstants.JOB_POOL_THREAD_PREFIX);
                        ActiveTaskRecord record = new 
ActiveTaskRecord(threadId, jobId, fromPoolThread);
                        _activeTasks.put(jobId, record);
                        if(fromPoolThread)

http://git-wip-us.apache.org/repos/asf/cloudstack/blob/c7530dbd/server/test/com/cloud/vm/VmWorkTestConfiguration.java
----------------------------------------------------------------------
diff --git a/server/test/com/cloud/vm/VmWorkTestConfiguration.java 
b/server/test/com/cloud/vm/VmWorkTestConfiguration.java
index 67a3c00..93aa41a 100644
--- a/server/test/com/cloud/vm/VmWorkTestConfiguration.java
+++ b/server/test/com/cloud/vm/VmWorkTestConfiguration.java
@@ -21,6 +21,7 @@ import org.springframework.context.annotation.Bean;
 import org.springframework.context.annotation.Configuration;
 
 import com.cloud.api.ApiDispatcher;
+import com.cloud.async.AsyncJobMonitor;
 import com.cloud.async.SyncQueueManager;
 import com.cloud.async.SyncQueueManagerImpl;
 import com.cloud.async.dao.AsyncJobDao;
@@ -123,4 +124,9 @@ public class VmWorkTestConfiguration {
        public VMInstanceDao vmInstanceDao() {
                return Mockito.mock(VMInstanceDao.class);
        }
+       
+       @Bean
+       public AsyncJobMonitor jobMonitor() {
+               return Mockito.mock(AsyncJobMonitor.class);
+       }
 }

Reply via email to