Updated Branches:
  refs/heads/vmsync d0a7ca082 -> f1ba6586b

Add Predicate and MesageDetector for message bus


Project: http://git-wip-us.apache.org/repos/asf/cloudstack/repo
Commit: http://git-wip-us.apache.org/repos/asf/cloudstack/commit/f1ba6586
Tree: http://git-wip-us.apache.org/repos/asf/cloudstack/tree/f1ba6586
Diff: http://git-wip-us.apache.org/repos/asf/cloudstack/diff/f1ba6586

Branch: refs/heads/vmsync
Commit: f1ba6586b963f212427531c4512f4556db5e30e4
Parents: d0a7ca0
Author: Kelven Yang <kelv...@gmail.com>
Authored: Thu Apr 25 17:57:48 2013 -0700
Committer: Kelven Yang <kelv...@gmail.com>
Committed: Thu Apr 25 17:57:48 2013 -0700

----------------------------------------------------------------------
 .../framework/messagebus/MessageDetector.java      |   75 +++++++++++++++
 .../cloudstack/messagebus/TestMessageBus.java      |   40 ++++++++
 server/pom.xml                                     |    5 +-
 server/src/com/cloud/async/AsyncJobManager.java    |    5 +
 .../src/com/cloud/async/AsyncJobManagerImpl.java   |   25 +++++-
 .../com/cloud/ha/HighAvailabilityManagerImpl.java  |    2 +-
 .../com/cloud/vm/VirtualMachineManagerImpl.java    |   26 ++++--
 .../test/com/cloud/async/TestAsyncJobManager.java  |   27 +++++
 server/test/resources/AsyncJobTestContext.xml      |    2 +
 utils/src/com/cloud/utils/Predicate.java           |   21 ++++
 10 files changed, 216 insertions(+), 12 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/cloudstack/blob/f1ba6586/framework/ipc/src/org/apache/cloudstack/framework/messagebus/MessageDetector.java
----------------------------------------------------------------------
diff --git 
a/framework/ipc/src/org/apache/cloudstack/framework/messagebus/MessageDetector.java
 
b/framework/ipc/src/org/apache/cloudstack/framework/messagebus/MessageDetector.java
new file mode 100644
index 0000000..7a7a34a
--- /dev/null
+++ 
b/framework/ipc/src/org/apache/cloudstack/framework/messagebus/MessageDetector.java
@@ -0,0 +1,75 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.cloudstack.framework.messagebus;
+
+public class MessageDetector implements MessageSubscriber {
+       
+       private MessageBus _messageBus;
+       private String[] _subjects;
+       
+       private volatile boolean _signalled = false;
+       
+       public MessageDetector() {
+               _messageBus = null;
+               _subjects = null;
+       }
+       
+       public boolean waitAny(long timeoutInMiliseconds) {
+               _signalled = false;
+               synchronized(this) {
+                       try {
+                               wait(timeoutInMiliseconds);
+                       } catch (InterruptedException e) {
+                       }
+               }
+               return _signalled;
+       }
+       
+       public void open(MessageBus messageBus, String[] subjects) {
+               assert(messageBus != null);
+               assert(subjects != null);
+               
+               _messageBus = messageBus;
+               _subjects = subjects;
+               
+               if(subjects != null) {
+                       for(String subject : subjects) {
+                               messageBus.subscribe(subject, this);
+                       }
+               }
+       }
+       
+       public void close() {
+               if(_subjects != null) {
+                       assert(_messageBus != null);
+                       
+                       for(String subject : _subjects) {
+                               _messageBus.unsubscribe(subject, this);
+                       }
+               }
+       }
+
+       @Override
+       public void onPublishMessage(String senderAddress, String subject, 
Object args) {
+               synchronized(this) {
+                       _signalled = true;
+                       notifyAll();
+               }
+       }
+}

http://git-wip-us.apache.org/repos/asf/cloudstack/blob/f1ba6586/framework/ipc/test/org/apache/cloudstack/messagebus/TestMessageBus.java
----------------------------------------------------------------------
diff --git 
a/framework/ipc/test/org/apache/cloudstack/messagebus/TestMessageBus.java 
b/framework/ipc/test/org/apache/cloudstack/messagebus/TestMessageBus.java
index dabfdd3..9f769ac 100644
--- a/framework/ipc/test/org/apache/cloudstack/messagebus/TestMessageBus.java
+++ b/framework/ipc/test/org/apache/cloudstack/messagebus/TestMessageBus.java
@@ -23,6 +23,7 @@ import javax.inject.Inject;
 import junit.framework.TestCase;
 
 import org.apache.cloudstack.framework.messagebus.MessageBus;
+import org.apache.cloudstack.framework.messagebus.MessageDetector;
 import org.apache.cloudstack.framework.messagebus.MessageSubscriber;
 import org.apache.cloudstack.framework.messagebus.PublishScope;
 import org.junit.Assert;
@@ -113,4 +114,43 @@ public class TestMessageBus extends TestCase {
                
                _messageBus.clearAll();
        }
+       
+       @Test
+       public void testMessageDetector() {
+               MessageDetector detector = new MessageDetector();
+               detector.open(_messageBus, new String[] {"VM", "Host"});
+               
+               Thread thread = new Thread(new Runnable() {
+                       @Override
+                       public void run() {
+                               for(int i = 0; i < 2; i++) {
+                                       try {
+                                               Thread.sleep(3000);
+                                       } catch (InterruptedException e) {
+                                       }
+                                       _messageBus.publish(null, "Host", 
PublishScope.GLOBAL, null);
+                               }
+                       }
+               });
+               thread.start();
+               
+               try {
+                       int count = 0;
+                       while(count < 2) {
+                               if(detector.waitAny(1000)) {
+                                       System.out.println("Detected signal on 
bus");
+                                       count++;
+                               } else {
+                                       System.out.println("Waiting timed out");
+                               }
+                       }
+               } finally {
+                       detector.close();
+               }
+               
+               try {
+                       thread.join();
+               } catch (InterruptedException e) {
+               }
+       }
 }

http://git-wip-us.apache.org/repos/asf/cloudstack/blob/f1ba6586/server/pom.xml
----------------------------------------------------------------------
diff --git a/server/pom.xml b/server/pom.xml
index 36e4c08..5d1f258 100644
--- a/server/pom.xml
+++ b/server/pom.xml
@@ -151,9 +151,8 @@
           <argLine>-Xmx1024m</argLine>
           <excludes>
             <exclude>%regex[.*[0-9]*To[0-9]*.*Test.*]</exclude>
-              
<exclude>com/cloud/upgrade/AdvanceZone223To224UpgradeTest</exclude>
-              
<exclude>com/cloud/upgrade/AdvanceZone217To224UpgradeTest</exclude>
-            <exclude>com/cloud/async/*</exclude>
+            <exclude>com/cloud/upgrade/AdvanceZone223To224UpgradeTest</exclude>
+            <exclude>com/cloud/upgrade/AdvanceZone217To224UpgradeTest</exclude>
             <exclude>com/cloud/cluster/*</exclude>
             <exclude>com/cloud/snapshot/*</exclude>
             <exclude>com/cloud/storage/dao/*</exclude>

http://git-wip-us.apache.org/repos/asf/cloudstack/blob/f1ba6586/server/src/com/cloud/async/AsyncJobManager.java
----------------------------------------------------------------------
diff --git a/server/src/com/cloud/async/AsyncJobManager.java 
b/server/src/com/cloud/async/AsyncJobManager.java
index 76f5600..668528b 100644
--- a/server/src/com/cloud/async/AsyncJobManager.java
+++ b/server/src/com/cloud/async/AsyncJobManager.java
@@ -19,6 +19,8 @@ package com.cloud.async;
 import java.util.List;
 
 import org.apache.cloudstack.api.command.user.job.QueryAsyncJobResultCmd;
+
+import com.cloud.utils.Predicate;
 import com.cloud.utils.component.Manager;
 
 public interface AsyncJobManager extends Manager {
@@ -39,6 +41,9 @@ public interface AsyncJobManager extends Manager {
     
     public void syncAsyncJobExecution(AsyncJob job, String syncObjType, long 
syncObjId, long queueSizeLimit);
     
+    public boolean waitAndCheck(String[] wakupSubjects, long 
checkIntervalInMilliSeconds, 
+       long timeoutInMiliseconds, Predicate predicate);
+    
     /**
      * Queries for the status or final result of an async job.
      * @param cmd the command that specifies the job id

http://git-wip-us.apache.org/repos/asf/cloudstack/blob/f1ba6586/server/src/com/cloud/async/AsyncJobManagerImpl.java
----------------------------------------------------------------------
diff --git a/server/src/com/cloud/async/AsyncJobManagerImpl.java 
b/server/src/com/cloud/async/AsyncJobManagerImpl.java
index c69877e..49e1958 100644
--- a/server/src/com/cloud/async/AsyncJobManagerImpl.java
+++ b/server/src/com/cloud/async/AsyncJobManagerImpl.java
@@ -36,6 +36,8 @@ import javax.naming.ConfigurationException;
 import org.apache.cloudstack.api.ApiErrorCode;
 import org.apache.cloudstack.api.command.user.job.QueryAsyncJobResultCmd;
 import org.apache.cloudstack.api.response.ExceptionResponse;
+import org.apache.cloudstack.framework.messagebus.MessageBus;
+import org.apache.cloudstack.framework.messagebus.MessageDetector;
 import org.apache.log4j.Logger;
 import org.apache.log4j.NDC;
 
@@ -54,6 +56,7 @@ import com.cloud.user.User;
 import com.cloud.user.UserContext;
 import com.cloud.utils.DateUtil;
 import com.cloud.utils.NumbersUtil;
+import com.cloud.utils.Predicate;
 import com.cloud.utils.PropertiesUtil;
 import com.cloud.utils.component.ManagerBase;
 import com.cloud.utils.concurrency.NamedThreadFactory;
@@ -81,6 +84,7 @@ public class AsyncJobManagerImpl extends ManagerBase 
implements AsyncJobManager,
     @Inject private AsyncJobDao _jobDao;
     @Inject private ConfigurationDao _configDao;
     @Inject private List<AsyncJobDispatcher> _jobDispatchers;
+    @Inject private MessageBus _messageBus;
 
     // property
     private String defaultDispatcher;
@@ -489,7 +493,26 @@ public class AsyncJobManagerImpl extends ManagerBase 
implements AsyncJobManager,
             _queueMgr.purgeItem(executionContext.getSyncSource().getId());
             checkQueue(executionContext.getSyncSource().getQueueId());
        }
-    }
+    }
+    
+    public boolean waitAndCheck(String[] wakupSubjects, long 
checkIntervalInMilliSeconds, 
+        long timeoutInMiliseconds, Predicate predicate) {
+       
+       MessageDetector msgDetector = new MessageDetector();
+       msgDetector.open(_messageBus, wakupSubjects);
+       try {
+               long startTick = System.currentTimeMillis();
+               while(System.currentTimeMillis() - startTick < 
timeoutInMiliseconds) {
+                       msgDetector.waitAny(checkIntervalInMilliSeconds);
+                       if(predicate.checkCondition())
+                               return true;
+               }
+       } finally {
+               msgDetector.close();
+       }
+       
+       return false;
+    }
 
     private void checkQueue(long queueId) {
         while(true) {

http://git-wip-us.apache.org/repos/asf/cloudstack/blob/f1ba6586/server/src/com/cloud/ha/HighAvailabilityManagerImpl.java
----------------------------------------------------------------------
diff --git a/server/src/com/cloud/ha/HighAvailabilityManagerImpl.java 
b/server/src/com/cloud/ha/HighAvailabilityManagerImpl.java
index 25c5a04..5edca3b 100755
--- a/server/src/com/cloud/ha/HighAvailabilityManagerImpl.java
+++ b/server/src/com/cloud/ha/HighAvailabilityManagerImpl.java
@@ -624,7 +624,7 @@ public class HighAvailabilityManagerImpl extends 
ManagerBase implements HighAvai
                 return null;
             }
         } catch (final AgentUnavailableException e) {
-            s_logger.debug("Agnet is not available" + e.getMessage());
+            s_logger.  debug("Agnet is not available" + e.getMessage());
         } catch (OperationTimedoutException e) {
             s_logger.debug("operation timed out: " + e.getMessage());
         } catch (ConcurrentOperationException e) {

http://git-wip-us.apache.org/repos/asf/cloudstack/blob/f1ba6586/server/src/com/cloud/vm/VirtualMachineManagerImpl.java
----------------------------------------------------------------------
diff --git a/server/src/com/cloud/vm/VirtualMachineManagerImpl.java 
b/server/src/com/cloud/vm/VirtualMachineManagerImpl.java
index 7b849a9..353a024 100755
--- a/server/src/com/cloud/vm/VirtualMachineManagerImpl.java
+++ b/server/src/com/cloud/vm/VirtualMachineManagerImpl.java
@@ -38,6 +38,9 @@ import javax.naming.ConfigurationException;
 
 import com.cloud.capacity.CapacityManager;
 import org.apache.cloudstack.engine.subsystem.api.storage.DataStoreManager;
+import org.apache.cloudstack.framework.messagebus.MessageBus;
+import org.apache.cloudstack.framework.messagebus.MessageDispatcher;
+import org.apache.cloudstack.messagebus.SubjectConstants;
 import org.apache.cloudstack.storage.datastore.db.PrimaryDataStoreDao;
 
 import com.cloud.dc.*;
@@ -237,6 +240,9 @@ public class VirtualMachineManagerImpl extends ManagerBase 
implements VirtualMac
     protected ConfigurationDao _configDao;
     @Inject
     VolumeManager volumeMgr;
+    
+    @Inject protected MessageBus _messageBus;
+    @Inject protected VirtualMachinePowerStateSync _syncMgr;
 
     Map<VirtualMachine.Type, VirtualMachineGuru<? extends VMInstanceVO>> 
_vmGurus = new HashMap<VirtualMachine.Type, VirtualMachineGuru<? extends 
VMInstanceVO>>();
     protected StateMachine2<State, VirtualMachine.Event, VirtualMachine> 
_stateMachine;
@@ -448,7 +454,7 @@ public class VirtualMachineManagerImpl extends ManagerBase 
implements VirtualMac
         _nodeId = _clusterMgr.getManagementNodeId();
 
         _agentMgr.registerForHostEvents(this, true, true, true);
-
+      
         return true;
     }
 
@@ -2294,7 +2300,16 @@ public class VirtualMachineManagerImpl extends 
ManagerBase implements VirtualMac
     public boolean processDisconnect(long agentId, Status state) {
         return true;
     }
-
+    
+    public void processConnect(HostVO agent, StartupCommand cmd, boolean 
forRebalance) throws ConnectionException {
+        if (!(cmd instanceof StartupRoutingCommand)) {
+            return;
+        }
+        
+        _syncMgr.resetHostSyncState(agent.getId());
+    }
+    
+/*    
     @Override
     public void processConnect(HostVO agent, StartupCommand cmd, boolean 
forRebalance) throws ConnectionException {
         if (!(cmd instanceof StartupRoutingCommand)) {
@@ -2317,12 +2332,10 @@ public class VirtualMachineManagerImpl extends 
ManagerBase implements VirtualMac
         if (agent.getHypervisorType() == HypervisorType.XenServer) { // only 
for Xen
             StartupRoutingCommand startup = (StartupRoutingCommand) cmd;
 
-/* TODO            
             HashMap<String, Pair<String, State>> allStates = 
startup.getClusterVMStateChanges();
             if (allStates != null){
                 this.fullSync(clusterId, allStates);
             }
-*/
             // initiate the cron job
             ClusterSyncCommand syncCmd = new 
ClusterSyncCommand(Integer.parseInt(Config.ClusterDeltaSyncInterval.getDefaultValue()),
 clusterId);
             try {
@@ -2335,7 +2348,6 @@ public class VirtualMachineManagerImpl extends 
ManagerBase implements VirtualMac
         else { // for others KVM and VMWare 
             StartupRoutingCommand startup = (StartupRoutingCommand) cmd;
 
-/*
             Commands commands = fullHostSync(agentId, startup);
 
             if (commands.size() > 0) {
@@ -2361,10 +2373,10 @@ public class VirtualMachineManagerImpl extends 
ManagerBase implements VirtualMac
                     throw new ConnectionException(true, "Unable to sync", e);
                 }
             }
-*/
         }
     }
-
+*/
+    
     protected class TransitionTask implements Runnable {
         @Override
         public void run() {

http://git-wip-us.apache.org/repos/asf/cloudstack/blob/f1ba6586/server/test/com/cloud/async/TestAsyncJobManager.java
----------------------------------------------------------------------
diff --git a/server/test/com/cloud/async/TestAsyncJobManager.java 
b/server/test/com/cloud/async/TestAsyncJobManager.java
index a56e66f..1b23937 100644
--- a/server/test/com/cloud/async/TestAsyncJobManager.java
+++ b/server/test/com/cloud/async/TestAsyncJobManager.java
@@ -28,6 +28,7 @@ import org.mockito.Mockito;
 import org.springframework.test.context.ContextConfiguration;
 import org.springframework.test.context.junit4.SpringJUnit4ClassRunner;
 
+import com.cloud.async.AsyncJobManager;
 import com.cloud.cluster.ClusterManager;
 import com.cloud.utils.component.ComponentContext;
 import com.cloud.utils.db.Transaction;
@@ -39,6 +40,7 @@ public class TestAsyncJobManager extends TestCase {
 
     @Inject AsyncJobManager asyncMgr;
     @Inject ClusterManager clusterMgr;
+    @Inject MessageBus messageBus;
 
     @Before                                                  
     public void setUp() {                                    
@@ -55,5 +57,30 @@ public class TestAsyncJobManager extends TestCase {
     
     @Test
     public void test() {
+               Thread thread = new Thread(new Runnable() {
+                       @Override
+                       public void run() {
+                               for(int i = 0; i < 2; i++) {
+                                       try {
+                                               Thread.sleep(1000);
+                                       } catch (InterruptedException e) {
+                                       }
+                                       messageBus.publish(null, "VM", 
PublishScope.GLOBAL, null);
+                               }
+                       }
+               });
+               thread.start();
+       
+       asyncMgr.waitAndCheck("VM", 5000, 10000, new Predicate() {
+               public boolean checkCondition() {
+                       s_logger.info("Check condition to exit");
+                       return false;
+               }
+       });
+       
+       try {
+               thread.join();
+       } catch(InterruptedException e) {
+       }
     }
 }

http://git-wip-us.apache.org/repos/asf/cloudstack/blob/f1ba6586/server/test/resources/AsyncJobTestContext.xml
----------------------------------------------------------------------
diff --git a/server/test/resources/AsyncJobTestContext.xml 
b/server/test/resources/AsyncJobTestContext.xml
index 5e62160..54ce0cd 100644
--- a/server/test/resources/AsyncJobTestContext.xml
+++ b/server/test/resources/AsyncJobTestContext.xml
@@ -43,6 +43,8 @@
   <bean id="ApiAsyncJobDispatcher" class="com.cloud.api.ApiAsyncJobDispatcher">
     <property name="name" value="ApiAsyncJobDispatcher" />
   </bean>
+  
+  <bean id="messageBus" class = 
"org.apache.cloudstack.framework.messagebus.MessageBusBase" />
 
   <bean class="com.cloud.async.AsyncJobTestConfiguration" />
 

http://git-wip-us.apache.org/repos/asf/cloudstack/blob/f1ba6586/utils/src/com/cloud/utils/Predicate.java
----------------------------------------------------------------------
diff --git a/utils/src/com/cloud/utils/Predicate.java 
b/utils/src/com/cloud/utils/Predicate.java
new file mode 100644
index 0000000..ddf0425
--- /dev/null
+++ b/utils/src/com/cloud/utils/Predicate.java
@@ -0,0 +1,21 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// the License.  You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+package com.cloud.utils;
+
+public interface Predicate {
+       boolean checkCondition();
+}

Reply via email to