Updated Branches: refs/heads/vmsync d0a7ca082 -> f1ba6586b
Add Predicate and MesageDetector for message bus Project: http://git-wip-us.apache.org/repos/asf/cloudstack/repo Commit: http://git-wip-us.apache.org/repos/asf/cloudstack/commit/f1ba6586 Tree: http://git-wip-us.apache.org/repos/asf/cloudstack/tree/f1ba6586 Diff: http://git-wip-us.apache.org/repos/asf/cloudstack/diff/f1ba6586 Branch: refs/heads/vmsync Commit: f1ba6586b963f212427531c4512f4556db5e30e4 Parents: d0a7ca0 Author: Kelven Yang <kelv...@gmail.com> Authored: Thu Apr 25 17:57:48 2013 -0700 Committer: Kelven Yang <kelv...@gmail.com> Committed: Thu Apr 25 17:57:48 2013 -0700 ---------------------------------------------------------------------- .../framework/messagebus/MessageDetector.java | 75 +++++++++++++++ .../cloudstack/messagebus/TestMessageBus.java | 40 ++++++++ server/pom.xml | 5 +- server/src/com/cloud/async/AsyncJobManager.java | 5 + .../src/com/cloud/async/AsyncJobManagerImpl.java | 25 +++++- .../com/cloud/ha/HighAvailabilityManagerImpl.java | 2 +- .../com/cloud/vm/VirtualMachineManagerImpl.java | 26 ++++-- .../test/com/cloud/async/TestAsyncJobManager.java | 27 +++++ server/test/resources/AsyncJobTestContext.xml | 2 + utils/src/com/cloud/utils/Predicate.java | 21 ++++ 10 files changed, 216 insertions(+), 12 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/cloudstack/blob/f1ba6586/framework/ipc/src/org/apache/cloudstack/framework/messagebus/MessageDetector.java ---------------------------------------------------------------------- diff --git a/framework/ipc/src/org/apache/cloudstack/framework/messagebus/MessageDetector.java b/framework/ipc/src/org/apache/cloudstack/framework/messagebus/MessageDetector.java new file mode 100644 index 0000000..7a7a34a --- /dev/null +++ b/framework/ipc/src/org/apache/cloudstack/framework/messagebus/MessageDetector.java @@ -0,0 +1,75 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.cloudstack.framework.messagebus; + +public class MessageDetector implements MessageSubscriber { + + private MessageBus _messageBus; + private String[] _subjects; + + private volatile boolean _signalled = false; + + public MessageDetector() { + _messageBus = null; + _subjects = null; + } + + public boolean waitAny(long timeoutInMiliseconds) { + _signalled = false; + synchronized(this) { + try { + wait(timeoutInMiliseconds); + } catch (InterruptedException e) { + } + } + return _signalled; + } + + public void open(MessageBus messageBus, String[] subjects) { + assert(messageBus != null); + assert(subjects != null); + + _messageBus = messageBus; + _subjects = subjects; + + if(subjects != null) { + for(String subject : subjects) { + messageBus.subscribe(subject, this); + } + } + } + + public void close() { + if(_subjects != null) { + assert(_messageBus != null); + + for(String subject : _subjects) { + _messageBus.unsubscribe(subject, this); + } + } + } + + @Override + public void onPublishMessage(String senderAddress, String subject, Object args) { + synchronized(this) { + _signalled = true; + notifyAll(); + } + } +} http://git-wip-us.apache.org/repos/asf/cloudstack/blob/f1ba6586/framework/ipc/test/org/apache/cloudstack/messagebus/TestMessageBus.java ---------------------------------------------------------------------- diff --git a/framework/ipc/test/org/apache/cloudstack/messagebus/TestMessageBus.java b/framework/ipc/test/org/apache/cloudstack/messagebus/TestMessageBus.java index dabfdd3..9f769ac 100644 --- a/framework/ipc/test/org/apache/cloudstack/messagebus/TestMessageBus.java +++ b/framework/ipc/test/org/apache/cloudstack/messagebus/TestMessageBus.java @@ -23,6 +23,7 @@ import javax.inject.Inject; import junit.framework.TestCase; import org.apache.cloudstack.framework.messagebus.MessageBus; +import org.apache.cloudstack.framework.messagebus.MessageDetector; import org.apache.cloudstack.framework.messagebus.MessageSubscriber; import org.apache.cloudstack.framework.messagebus.PublishScope; import org.junit.Assert; @@ -113,4 +114,43 @@ public class TestMessageBus extends TestCase { _messageBus.clearAll(); } + + @Test + public void testMessageDetector() { + MessageDetector detector = new MessageDetector(); + detector.open(_messageBus, new String[] {"VM", "Host"}); + + Thread thread = new Thread(new Runnable() { + @Override + public void run() { + for(int i = 0; i < 2; i++) { + try { + Thread.sleep(3000); + } catch (InterruptedException e) { + } + _messageBus.publish(null, "Host", PublishScope.GLOBAL, null); + } + } + }); + thread.start(); + + try { + int count = 0; + while(count < 2) { + if(detector.waitAny(1000)) { + System.out.println("Detected signal on bus"); + count++; + } else { + System.out.println("Waiting timed out"); + } + } + } finally { + detector.close(); + } + + try { + thread.join(); + } catch (InterruptedException e) { + } + } } http://git-wip-us.apache.org/repos/asf/cloudstack/blob/f1ba6586/server/pom.xml ---------------------------------------------------------------------- diff --git a/server/pom.xml b/server/pom.xml index 36e4c08..5d1f258 100644 --- a/server/pom.xml +++ b/server/pom.xml @@ -151,9 +151,8 @@ <argLine>-Xmx1024m</argLine> <excludes> <exclude>%regex[.*[0-9]*To[0-9]*.*Test.*]</exclude> - <exclude>com/cloud/upgrade/AdvanceZone223To224UpgradeTest</exclude> - <exclude>com/cloud/upgrade/AdvanceZone217To224UpgradeTest</exclude> - <exclude>com/cloud/async/*</exclude> + <exclude>com/cloud/upgrade/AdvanceZone223To224UpgradeTest</exclude> + <exclude>com/cloud/upgrade/AdvanceZone217To224UpgradeTest</exclude> <exclude>com/cloud/cluster/*</exclude> <exclude>com/cloud/snapshot/*</exclude> <exclude>com/cloud/storage/dao/*</exclude> http://git-wip-us.apache.org/repos/asf/cloudstack/blob/f1ba6586/server/src/com/cloud/async/AsyncJobManager.java ---------------------------------------------------------------------- diff --git a/server/src/com/cloud/async/AsyncJobManager.java b/server/src/com/cloud/async/AsyncJobManager.java index 76f5600..668528b 100644 --- a/server/src/com/cloud/async/AsyncJobManager.java +++ b/server/src/com/cloud/async/AsyncJobManager.java @@ -19,6 +19,8 @@ package com.cloud.async; import java.util.List; import org.apache.cloudstack.api.command.user.job.QueryAsyncJobResultCmd; + +import com.cloud.utils.Predicate; import com.cloud.utils.component.Manager; public interface AsyncJobManager extends Manager { @@ -39,6 +41,9 @@ public interface AsyncJobManager extends Manager { public void syncAsyncJobExecution(AsyncJob job, String syncObjType, long syncObjId, long queueSizeLimit); + public boolean waitAndCheck(String[] wakupSubjects, long checkIntervalInMilliSeconds, + long timeoutInMiliseconds, Predicate predicate); + /** * Queries for the status or final result of an async job. * @param cmd the command that specifies the job id http://git-wip-us.apache.org/repos/asf/cloudstack/blob/f1ba6586/server/src/com/cloud/async/AsyncJobManagerImpl.java ---------------------------------------------------------------------- diff --git a/server/src/com/cloud/async/AsyncJobManagerImpl.java b/server/src/com/cloud/async/AsyncJobManagerImpl.java index c69877e..49e1958 100644 --- a/server/src/com/cloud/async/AsyncJobManagerImpl.java +++ b/server/src/com/cloud/async/AsyncJobManagerImpl.java @@ -36,6 +36,8 @@ import javax.naming.ConfigurationException; import org.apache.cloudstack.api.ApiErrorCode; import org.apache.cloudstack.api.command.user.job.QueryAsyncJobResultCmd; import org.apache.cloudstack.api.response.ExceptionResponse; +import org.apache.cloudstack.framework.messagebus.MessageBus; +import org.apache.cloudstack.framework.messagebus.MessageDetector; import org.apache.log4j.Logger; import org.apache.log4j.NDC; @@ -54,6 +56,7 @@ import com.cloud.user.User; import com.cloud.user.UserContext; import com.cloud.utils.DateUtil; import com.cloud.utils.NumbersUtil; +import com.cloud.utils.Predicate; import com.cloud.utils.PropertiesUtil; import com.cloud.utils.component.ManagerBase; import com.cloud.utils.concurrency.NamedThreadFactory; @@ -81,6 +84,7 @@ public class AsyncJobManagerImpl extends ManagerBase implements AsyncJobManager, @Inject private AsyncJobDao _jobDao; @Inject private ConfigurationDao _configDao; @Inject private List<AsyncJobDispatcher> _jobDispatchers; + @Inject private MessageBus _messageBus; // property private String defaultDispatcher; @@ -489,7 +493,26 @@ public class AsyncJobManagerImpl extends ManagerBase implements AsyncJobManager, _queueMgr.purgeItem(executionContext.getSyncSource().getId()); checkQueue(executionContext.getSyncSource().getQueueId()); } - } + } + + public boolean waitAndCheck(String[] wakupSubjects, long checkIntervalInMilliSeconds, + long timeoutInMiliseconds, Predicate predicate) { + + MessageDetector msgDetector = new MessageDetector(); + msgDetector.open(_messageBus, wakupSubjects); + try { + long startTick = System.currentTimeMillis(); + while(System.currentTimeMillis() - startTick < timeoutInMiliseconds) { + msgDetector.waitAny(checkIntervalInMilliSeconds); + if(predicate.checkCondition()) + return true; + } + } finally { + msgDetector.close(); + } + + return false; + } private void checkQueue(long queueId) { while(true) { http://git-wip-us.apache.org/repos/asf/cloudstack/blob/f1ba6586/server/src/com/cloud/ha/HighAvailabilityManagerImpl.java ---------------------------------------------------------------------- diff --git a/server/src/com/cloud/ha/HighAvailabilityManagerImpl.java b/server/src/com/cloud/ha/HighAvailabilityManagerImpl.java index 25c5a04..5edca3b 100755 --- a/server/src/com/cloud/ha/HighAvailabilityManagerImpl.java +++ b/server/src/com/cloud/ha/HighAvailabilityManagerImpl.java @@ -624,7 +624,7 @@ public class HighAvailabilityManagerImpl extends ManagerBase implements HighAvai return null; } } catch (final AgentUnavailableException e) { - s_logger.debug("Agnet is not available" + e.getMessage()); + s_logger. debug("Agnet is not available" + e.getMessage()); } catch (OperationTimedoutException e) { s_logger.debug("operation timed out: " + e.getMessage()); } catch (ConcurrentOperationException e) { http://git-wip-us.apache.org/repos/asf/cloudstack/blob/f1ba6586/server/src/com/cloud/vm/VirtualMachineManagerImpl.java ---------------------------------------------------------------------- diff --git a/server/src/com/cloud/vm/VirtualMachineManagerImpl.java b/server/src/com/cloud/vm/VirtualMachineManagerImpl.java index 7b849a9..353a024 100755 --- a/server/src/com/cloud/vm/VirtualMachineManagerImpl.java +++ b/server/src/com/cloud/vm/VirtualMachineManagerImpl.java @@ -38,6 +38,9 @@ import javax.naming.ConfigurationException; import com.cloud.capacity.CapacityManager; import org.apache.cloudstack.engine.subsystem.api.storage.DataStoreManager; +import org.apache.cloudstack.framework.messagebus.MessageBus; +import org.apache.cloudstack.framework.messagebus.MessageDispatcher; +import org.apache.cloudstack.messagebus.SubjectConstants; import org.apache.cloudstack.storage.datastore.db.PrimaryDataStoreDao; import com.cloud.dc.*; @@ -237,6 +240,9 @@ public class VirtualMachineManagerImpl extends ManagerBase implements VirtualMac protected ConfigurationDao _configDao; @Inject VolumeManager volumeMgr; + + @Inject protected MessageBus _messageBus; + @Inject protected VirtualMachinePowerStateSync _syncMgr; Map<VirtualMachine.Type, VirtualMachineGuru<? extends VMInstanceVO>> _vmGurus = new HashMap<VirtualMachine.Type, VirtualMachineGuru<? extends VMInstanceVO>>(); protected StateMachine2<State, VirtualMachine.Event, VirtualMachine> _stateMachine; @@ -448,7 +454,7 @@ public class VirtualMachineManagerImpl extends ManagerBase implements VirtualMac _nodeId = _clusterMgr.getManagementNodeId(); _agentMgr.registerForHostEvents(this, true, true, true); - + return true; } @@ -2294,7 +2300,16 @@ public class VirtualMachineManagerImpl extends ManagerBase implements VirtualMac public boolean processDisconnect(long agentId, Status state) { return true; } - + + public void processConnect(HostVO agent, StartupCommand cmd, boolean forRebalance) throws ConnectionException { + if (!(cmd instanceof StartupRoutingCommand)) { + return; + } + + _syncMgr.resetHostSyncState(agent.getId()); + } + +/* @Override public void processConnect(HostVO agent, StartupCommand cmd, boolean forRebalance) throws ConnectionException { if (!(cmd instanceof StartupRoutingCommand)) { @@ -2317,12 +2332,10 @@ public class VirtualMachineManagerImpl extends ManagerBase implements VirtualMac if (agent.getHypervisorType() == HypervisorType.XenServer) { // only for Xen StartupRoutingCommand startup = (StartupRoutingCommand) cmd; -/* TODO HashMap<String, Pair<String, State>> allStates = startup.getClusterVMStateChanges(); if (allStates != null){ this.fullSync(clusterId, allStates); } -*/ // initiate the cron job ClusterSyncCommand syncCmd = new ClusterSyncCommand(Integer.parseInt(Config.ClusterDeltaSyncInterval.getDefaultValue()), clusterId); try { @@ -2335,7 +2348,6 @@ public class VirtualMachineManagerImpl extends ManagerBase implements VirtualMac else { // for others KVM and VMWare StartupRoutingCommand startup = (StartupRoutingCommand) cmd; -/* Commands commands = fullHostSync(agentId, startup); if (commands.size() > 0) { @@ -2361,10 +2373,10 @@ public class VirtualMachineManagerImpl extends ManagerBase implements VirtualMac throw new ConnectionException(true, "Unable to sync", e); } } -*/ } } - +*/ + protected class TransitionTask implements Runnable { @Override public void run() { http://git-wip-us.apache.org/repos/asf/cloudstack/blob/f1ba6586/server/test/com/cloud/async/TestAsyncJobManager.java ---------------------------------------------------------------------- diff --git a/server/test/com/cloud/async/TestAsyncJobManager.java b/server/test/com/cloud/async/TestAsyncJobManager.java index a56e66f..1b23937 100644 --- a/server/test/com/cloud/async/TestAsyncJobManager.java +++ b/server/test/com/cloud/async/TestAsyncJobManager.java @@ -28,6 +28,7 @@ import org.mockito.Mockito; import org.springframework.test.context.ContextConfiguration; import org.springframework.test.context.junit4.SpringJUnit4ClassRunner; +import com.cloud.async.AsyncJobManager; import com.cloud.cluster.ClusterManager; import com.cloud.utils.component.ComponentContext; import com.cloud.utils.db.Transaction; @@ -39,6 +40,7 @@ public class TestAsyncJobManager extends TestCase { @Inject AsyncJobManager asyncMgr; @Inject ClusterManager clusterMgr; + @Inject MessageBus messageBus; @Before public void setUp() { @@ -55,5 +57,30 @@ public class TestAsyncJobManager extends TestCase { @Test public void test() { + Thread thread = new Thread(new Runnable() { + @Override + public void run() { + for(int i = 0; i < 2; i++) { + try { + Thread.sleep(1000); + } catch (InterruptedException e) { + } + messageBus.publish(null, "VM", PublishScope.GLOBAL, null); + } + } + }); + thread.start(); + + asyncMgr.waitAndCheck("VM", 5000, 10000, new Predicate() { + public boolean checkCondition() { + s_logger.info("Check condition to exit"); + return false; + } + }); + + try { + thread.join(); + } catch(InterruptedException e) { + } } } http://git-wip-us.apache.org/repos/asf/cloudstack/blob/f1ba6586/server/test/resources/AsyncJobTestContext.xml ---------------------------------------------------------------------- diff --git a/server/test/resources/AsyncJobTestContext.xml b/server/test/resources/AsyncJobTestContext.xml index 5e62160..54ce0cd 100644 --- a/server/test/resources/AsyncJobTestContext.xml +++ b/server/test/resources/AsyncJobTestContext.xml @@ -43,6 +43,8 @@ <bean id="ApiAsyncJobDispatcher" class="com.cloud.api.ApiAsyncJobDispatcher"> <property name="name" value="ApiAsyncJobDispatcher" /> </bean> + + <bean id="messageBus" class = "org.apache.cloudstack.framework.messagebus.MessageBusBase" /> <bean class="com.cloud.async.AsyncJobTestConfiguration" /> http://git-wip-us.apache.org/repos/asf/cloudstack/blob/f1ba6586/utils/src/com/cloud/utils/Predicate.java ---------------------------------------------------------------------- diff --git a/utils/src/com/cloud/utils/Predicate.java b/utils/src/com/cloud/utils/Predicate.java new file mode 100644 index 0000000..ddf0425 --- /dev/null +++ b/utils/src/com/cloud/utils/Predicate.java @@ -0,0 +1,21 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. +package com.cloud.utils; + +public interface Predicate { + boolean checkCondition(); +}