Author: agazzarini Date: Sat Feb 14 10:49:38 2009 New Revision: 744478 URL: http://svn.apache.org/viewvc?rev=744478&view=rev Log: QPID-1664 : QMan Asynchronous notification capability
Added: qpid/trunk/qpid/java/management/client/src/main/java/org/apache/qpid/management/configuration/WorkerManagerConfigurationParser.java qpid/trunk/qpid/java/management/client/src/test/java/org/apache/qpid/management/wsdm/capabilities/QManAdapterCapabilityTest.java Modified: qpid/trunk/qpid/java/management/client/etc/qman-config.xml qpid/trunk/qpid/java/management/client/src/main/java/org/apache/qpid/management/Messages.java qpid/trunk/qpid/java/management/client/src/main/java/org/apache/qpid/management/Names.java qpid/trunk/qpid/java/management/client/src/main/java/org/apache/qpid/management/configuration/BrokerConnectionDataParser.java qpid/trunk/qpid/java/management/client/src/main/java/org/apache/qpid/management/configuration/Configuration.java qpid/trunk/qpid/java/management/client/src/main/java/org/apache/qpid/management/configuration/Configurator.java qpid/trunk/qpid/java/management/client/src/main/java/org/apache/qpid/management/configuration/Tag.java qpid/trunk/qpid/java/management/client/src/main/java/org/apache/qpid/management/wsdm/capabilities/QManAdapterCapability.java qpid/trunk/qpid/java/management/client/src/test/java/log4j.xml Modified: qpid/trunk/qpid/java/management/client/etc/qman-config.xml URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/management/client/etc/qman-config.xml?rev=744478&r1=744477&r2=744478&view=diff ============================================================================== --- qpid/trunk/qpid/java/management/client/etc/qman-config.xml (original) +++ qpid/trunk/qpid/java/management/client/etc/qman-config.xml Sat Feb 14 10:49:38 2009 @@ -18,13 +18,14 @@ - under the License. - --> -<!-- -Default configuration for QMan is empty; -that is, there's no broker configured at startup. -If you want to connect with a running broker when QMan starts up, -you can do that uncommenting and editing the template reported below. ---> <configuration> + +<!-- +Default configuration for QMan has no broker settings; +that is, there's no broker configured at startup. +If you want to connect with a running broker when QMan starts up, +you can do that uncommenting and editing the template reported below. +--> <!-- <brokers> <broker> <host>localhost</host> @@ -47,5 +48,21 @@ <max-wait-timeout>-1</max-wait-timeout> </broker> </brokers> - --> + --> + <!-- Internal worked manager configuration--> + <work-manager> + <!-- The size of the worker thread pool --> + <pool-capacity>5</pool-capacity> + + <!-- Maximum size of the worker thread pool --> + <max-pool-capacity>15</max-pool-capacity> + + <!-- + when the current number of threads is greater than + the pool-capacity, this is the maximum time that excess threads + can be in an idle state (without any task assigned) before terminating. + The value is expressed is milliseconds. + --> + <keep-alive-time>5000</keep-alive-time> + </work-manager> </configuration> Modified: qpid/trunk/qpid/java/management/client/src/main/java/org/apache/qpid/management/Messages.java URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/management/client/src/main/java/org/apache/qpid/management/Messages.java?rev=744478&r1=744477&r2=744478&view=diff ============================================================================== --- qpid/trunk/qpid/java/management/client/src/main/java/org/apache/qpid/management/Messages.java (original) +++ qpid/trunk/qpid/java/management/client/src/main/java/org/apache/qpid/management/Messages.java Sat Feb 14 10:49:38 2009 @@ -64,9 +64,12 @@ String QMAN_000029_DEFAULT_URI = "<QMAN-000029> : Default URI will be set to %s"; String QMAN_000030_RESOURCE_HAS_BEEN_CREATED = "<QMAN-000030> : New resource instance has been created and registered. Resource id is %s"; String QMAN_000031_RESOURCE_HAS_BEEN_REMOVED = "<QMAN-000031> : WS-Resource %s has been removed"; - String QMAN_000032_EVENTS_LIFECYCLE_TOPIC_HAS_BEEN_CREATED = "<QMAN-000032> : Events lifecycle topic has been created. Its name is %s"; - String QMAN_000033_OBJECTS_LIFECYCLE_TOPIC_HAS_BEEN_CREATED = "<QMAN-000033> : Objects lifecycle topic has been created. Its name is %s"; - + String QMAN_000032_EVENTS_LIFECYCLE_TOPIC_HAS_BEEN_CREATED = "<QMAN-000032> : Events lifecycle topic has been created with name %s"; + String QMAN_000033_OBJECTS_LIFECYCLE_TOPIC_HAS_BEEN_CREATED = "<QMAN-000033> : Objects lifecycle topic has been created with name %s"; + String QMAN_000034_UNCLASSIFIED_LIFECYCLE_TOPIC_HAS_BEEN_CREATED = "<QMAN-000034> : Unclassified object types lifecycle topic has been created with name %s"; + String QMAN_000035_WORK_MANAGER_POOL_SIZE = "<QMAN-000035> : Work Manager thread pool size : %s"; + String QMAN_000036_WORK_MANAGER_MAX_POOL_SIZE = "<QMAN-000036> : Work Manager thread pool max size : %s"; + String QMAN_000037_WORK_MANAGER_KEEP_ALIVE_TIME = "<QMAN-000035> : Work Manager keep alive time : %s"; // DEBUG String QMAN_200001_INCOMING_MESSAGE_HAS_BEEN_RECEIVED = "<QMAN-200001> : New incoming message has been received. Message content is %s"; @@ -165,4 +168,8 @@ String QMAN_100035_GET_ATTRIBUTE_FAILURE = "<QMAN-100035> : Get Attribute invocation failure for attribute %s, resource %s."; String QMAN_100036_SET_ATTRIBUTE_FAILURE = "<QMAN-100036> : Set Attribute invocation failure for attribute %s, resource %s."; String QMAN_100037_INVOKE_OPERATION_FAILURE = "<QMAN-100037> : Operation Invocation failure for operation."; + String QMAN_100038_UNABLE_TO_SEND_WS_NOTIFICATION = "<QMAN-100038> : Unable to send notification."; + String QMAN_100039_UNABLE_TO_CONFIGURE_PROPERLY_WORKER_MANAGER = "<QMAN-100039> : Unable to properly configure WorkManager. A malformed property (NaN) was given as input parameter."; + + } \ No newline at end of file Modified: qpid/trunk/qpid/java/management/client/src/main/java/org/apache/qpid/management/Names.java URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/management/client/src/main/java/org/apache/qpid/management/Names.java?rev=744478&r1=744477&r2=744478&view=diff ============================================================================== --- qpid/trunk/qpid/java/management/client/src/main/java/org/apache/qpid/management/Names.java (original) +++ qpid/trunk/qpid/java/management/client/src/main/java/org/apache/qpid/management/Names.java Sat Feb 14 10:49:38 2009 @@ -170,6 +170,12 @@ "EventsLifeCycleTopic", Names.PREFIX); + public final static QName UNKNOWN_OBJECT_TYPE_LIFECYLE_TOPIC_NAME= new QName( + Names.NAMESPACE_URI, + "UnclassifiedLifeCycleTopic", + Names.PREFIX); + + public final static String NAME_ATTRIBUTE = "name"; public final static String MODIFIABILITY = "modifiability"; public final static String READ_WRITE = "read-write"; Modified: qpid/trunk/qpid/java/management/client/src/main/java/org/apache/qpid/management/configuration/BrokerConnectionDataParser.java URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/management/client/src/main/java/org/apache/qpid/management/configuration/BrokerConnectionDataParser.java?rev=744478&r1=744477&r2=744478&view=diff ============================================================================== --- qpid/trunk/qpid/java/management/client/src/main/java/org/apache/qpid/management/configuration/BrokerConnectionDataParser.java (original) +++ qpid/trunk/qpid/java/management/client/src/main/java/org/apache/qpid/management/configuration/BrokerConnectionDataParser.java Sat Feb 14 10:49:38 2009 @@ -26,9 +26,8 @@ import org.apache.qpid.transport.util.Logger; /** - * Parser used for building access mode mappings. - * For each access-mode-mappings/mapping element found in the configuration file, a new access mode mapping - * is built and injected into the bridge configuration. + * Parser used for building broker connection data settings. + * The corresponding section on the configuration file is : * <broker> <host>192.168.148.131</host> Modified: qpid/trunk/qpid/java/management/client/src/main/java/org/apache/qpid/management/configuration/Configuration.java URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/management/client/src/main/java/org/apache/qpid/management/configuration/Configuration.java?rev=744478&r1=744477&r2=744478&view=diff ============================================================================== --- qpid/trunk/qpid/java/management/client/src/main/java/org/apache/qpid/management/configuration/Configuration.java (original) +++ qpid/trunk/qpid/java/management/client/src/main/java/org/apache/qpid/management/configuration/Configuration.java Sat Feb 14 10:49:38 2009 @@ -25,13 +25,10 @@ import java.util.Set; import java.util.UUID; import java.util.Map.Entry; -import java.util.concurrent.BlockingQueue; -import java.util.concurrent.SynchronousQueue; import org.apache.qpid.management.Messages; import org.apache.qpid.management.Names; import org.apache.qpid.management.domain.handler.base.IMessageHandler; -import org.apache.qpid.management.domain.handler.impl.InvocationResult; import org.apache.qpid.management.domain.model.AccessMode; import org.apache.qpid.management.domain.model.type.Type; import org.apache.qpid.transport.DeliveryProperties; @@ -49,6 +46,11 @@ private final static Logger LOGGER = Logger.get(Configuration.class); private static Configuration INSTANCE = new Configuration(); + // Work Manager default settings + private int _poolSize = 5; + private int _maxPoolSize = 15; + private long _keepAliveTime = 5000; + Map<Integer, Type> _typeMappings = new HashMap<Integer,Type>(); Map<Integer,AccessMode> _accessModes = new HashMap<Integer, AccessMode>(); Map<Type,String> _validators = new HashMap<Type, String>(); @@ -64,8 +66,7 @@ private Header _headerForCommandMessages; private DeliveryProperties _deliveryProperties = new DeliveryProperties(); private MessageProperties _messageProperties = new MessageProperties(); - public BlockingQueue<InvocationResult> _resultExchangeChannel = new SynchronousQueue<InvocationResult>(); - + // Private constructor. private Configuration() { @@ -355,4 +356,68 @@ LOGGER.debug(Messages.QMAN_200004_MANAGEMENT_QUEUE_NAME,_managementQueueName); LOGGER.debug(Messages.QMAN_200005_METHOD_REPLY_QUEUE_NAME,_methodReplyQueueName); } -} + + /** + * Returns the worker manager thread pool size. + * + * @return the worker manager thread pool size. + */ + public int getWorkerManagerPoolSize() + { + return _poolSize; + } + + /** + * Sets the size of the worker manager thread pool. + * + * @param poolSize the size of the worker manager thread pool. + */ + void setWorkerManagerPoolSize(int poolSize) + { + this._poolSize = poolSize; + } + + /** + * Returns the maximum size of the worker manager + * thread pool size. + * + * @return the max size of the worker manager thread pool. + */ + public int getWorkerManagerMaxPoolSize() + { + return _maxPoolSize; + } + + /** + * Sets the maximum size of the worker manager + * thread pool size. + * + * @param maxPoolSize the max size of the worker manager thread pool. + */ + void setWorkerManagerMaxPoolSize(int maxPoolSize) + { + this._maxPoolSize = maxPoolSize; + } + + /** + * Returns the max amount of time that an excess thread + * can be idle before purging from the pool. + * + * @return the max keep alive time. + */ + public long getWorkerManagerKeepAliveTime() + { + return _keepAliveTime; + } + + /** + * Sets the max amount of time that an excess thread + * can be idle before purging from the pool. + * + * @param keepAliveTime the max keep alive time. + */ + void setWorkerManagerKeepAliveTime(long keepAliveTime) + { + this._keepAliveTime = keepAliveTime; + } +} \ No newline at end of file Modified: qpid/trunk/qpid/java/management/client/src/main/java/org/apache/qpid/management/configuration/Configurator.java URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/management/client/src/main/java/org/apache/qpid/management/configuration/Configurator.java?rev=744478&r1=744477&r2=744478&view=diff ============================================================================== --- qpid/trunk/qpid/java/management/client/src/main/java/org/apache/qpid/management/configuration/Configurator.java (original) +++ qpid/trunk/qpid/java/management/client/src/main/java/org/apache/qpid/management/configuration/Configurator.java Sat Feb 14 10:49:38 2009 @@ -77,6 +77,7 @@ }; IParser _brokerConfigurationParser = new BrokerConnectionDataParser(); + IParser _workerManagerConfigurationParser = new WorkerManagerConfigurationParser(); IParser _currentParser = DEFAULT_PARSER; /** @@ -97,12 +98,18 @@ @Override public void startElement (String uri, String localName, String name, Attributes attributes) throws SAXException { - switch(Tag.get(name)) { - case BROKERS: + switch(Tag.get(name)) + { + case BROKERS : { _currentParser = _brokerConfigurationParser; break; } + case WORK_MANAGER : + { + _currentParser = _workerManagerConfigurationParser; + break; + } } } Modified: qpid/trunk/qpid/java/management/client/src/main/java/org/apache/qpid/management/configuration/Tag.java URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/management/client/src/main/java/org/apache/qpid/management/configuration/Tag.java?rev=744478&r1=744477&r2=744478&view=diff ============================================================================== --- qpid/trunk/qpid/java/management/client/src/main/java/org/apache/qpid/management/configuration/Tag.java (original) +++ qpid/trunk/qpid/java/management/client/src/main/java/org/apache/qpid/management/configuration/Tag.java Sat Feb 14 10:49:38 2009 @@ -37,7 +37,10 @@ VIRTUAL_HOST { @Override public String toString() { return "virtual-host"; }}, USER { @Override public String toString() { return "user"; }}, PASSWORD { @Override public String toString() { return "password"; }}, - BROKERS { @Override public String toString() { return "brokers"; }}; + BROKERS { @Override public String toString() { return "brokers"; }}, + WORK_MANAGER { @Override public String toString() { return "work-manager"; }}, + POOL_CAPACITY { @Override public String toString() { return "pool-capacity"; }}, + KEEP_ALIVE_TIME { @Override public String toString() { return "keep-alive-time"; }}; /** * Returns the enum entry associated to the given tag name. Added: qpid/trunk/qpid/java/management/client/src/main/java/org/apache/qpid/management/configuration/WorkerManagerConfigurationParser.java URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/management/client/src/main/java/org/apache/qpid/management/configuration/WorkerManagerConfigurationParser.java?rev=744478&view=auto ============================================================================== --- qpid/trunk/qpid/java/management/client/src/main/java/org/apache/qpid/management/configuration/WorkerManagerConfigurationParser.java (added) +++ qpid/trunk/qpid/java/management/client/src/main/java/org/apache/qpid/management/configuration/WorkerManagerConfigurationParser.java Sat Feb 14 10:49:38 2009 @@ -0,0 +1,108 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +package org.apache.qpid.management.configuration; + +import java.util.UUID; + +import org.apache.qpid.management.Messages; +import org.apache.qpid.transport.util.Logger; + +/** + * Parser used for building worker manager settings. + * The corresponding section of the configuration file is : + * + <work-manager> + <pool-capacity>5</pool-capacity> + <max-pool-capacity>15</max-pool-capacity> + <keep-alive-time>5000</keep-alive-time> + </work-manager> + + * + * @author Andrea Gazzarini + */ +class WorkerManagerConfigurationParser implements IParser +{ + private final static Logger LOGGER = Logger.get(Configuration.class); + private String _currentValue; + + private String _poolSizeAsString; + private String _maxPoolSizeAsString; + private String _keepAliveTimeAsString; + + /** + * Callback : the given value is the text content of the current node. + */ + public void setCurrrentAttributeValue (String value) + { + this._currentValue = value; + } + + /** + * Callback: each time the end of an element is reached + * this method is called. + */ + public void setCurrentAttributeName (String name) + { + switch (Tag.get(name)) + { + case POOL_CAPACITY: + { + _poolSizeAsString = _currentValue.trim(); + break; + } + case MAX_POOL_CAPACITY : + { + _maxPoolSizeAsString = _currentValue; + } + case KEEP_ALIVE_TIME: + { + _keepAliveTimeAsString = _currentValue; + break; + } + case WORK_MANAGER: + { + Configuration configuration = Configuration.getInstance(); + try + { + configuration.setWorkerManagerPoolSize(Integer.parseInt(_poolSizeAsString)); + configuration.setWorkerManagerMaxPoolSize(Integer.parseInt(_maxPoolSizeAsString)); + configuration.setWorkerManagerKeepAliveTime(Long.parseLong(_keepAliveTimeAsString)); + } catch(Exception exception) + { + LOGGER.error(Messages.QMAN_100039_UNABLE_TO_CONFIGURE_PROPERLY_WORKER_MANAGER); + } finally { + LOGGER.info(Messages.QMAN_000035_WORK_MANAGER_POOL_SIZE,configuration.getWorkerManagerPoolSize()); + LOGGER.info(Messages.QMAN_000036_WORK_MANAGER_MAX_POOL_SIZE,configuration.getWorkerManagerMaxPoolSize()); + LOGGER.info(Messages.QMAN_000037_WORK_MANAGER_KEEP_ALIVE_TIME,configuration.getWorkerManagerKeepAliveTime()); + } + break; + } + } + } + + /** + * Gets an uuid in order to associate current connection data with a broker. + * @return + */ + UUID getUUId(){ + return UUID.randomUUID(); + } +} Modified: qpid/trunk/qpid/java/management/client/src/main/java/org/apache/qpid/management/wsdm/capabilities/QManAdapterCapability.java URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/management/client/src/main/java/org/apache/qpid/management/wsdm/capabilities/QManAdapterCapability.java?rev=744478&r1=744477&r2=744478&view=diff ============================================================================== --- qpid/trunk/qpid/java/management/client/src/main/java/org/apache/qpid/management/wsdm/capabilities/QManAdapterCapability.java (original) +++ qpid/trunk/qpid/java/management/client/src/main/java/org/apache/qpid/management/wsdm/capabilities/QManAdapterCapability.java Sat Feb 14 10:49:38 2009 @@ -25,7 +25,11 @@ import java.net.URI; import java.util.ArrayList; import java.util.Collection; -import java.util.UUID; +import java.util.HashMap; +import java.util.Map; +import java.util.concurrent.ArrayBlockingQueue; +import java.util.concurrent.ThreadPoolExecutor; +import java.util.concurrent.TimeUnit; import javax.management.InstanceNotFoundException; import javax.management.MBeanServer; @@ -46,6 +50,7 @@ import org.apache.muse.ws.notification.WsnConstants; import org.apache.qpid.management.Messages; import org.apache.qpid.management.Names; +import org.apache.qpid.management.configuration.Configuration; import org.apache.qpid.management.jmx.EntityLifecycleNotification; import org.apache.qpid.management.wsdm.common.ThreadSessionManager; import org.apache.qpid.management.wsdm.muse.engine.WSDMAdapterEnvironment; @@ -67,6 +72,40 @@ private MBeanServer _mxServer; private WsArtifactsFactory _artifactsFactory; private URI _resourceURI; + private NotificationProducer _publisherCapability; + private ThreadPoolExecutor _workManager; + private Map<String, QName> _lifeCycleTopics = new HashMap<String, QName>(); + + /** + * Runnable wrapper used for sending asynchronous + * notifications. + * + * @author Andrea Gazzarini + */ + private final class AsynchNotificationTask implements Runnable + { + private final QName topicName; + private final LifeCycleEvent event; + + AsynchNotificationTask(QName tName, LifeCycleEvent evt) + { + topicName = tName; + event = evt; + } + + public void run() + { + try + { + _publisherCapability.publish(topicName,event); + } catch (SoapFault exception) + { + LOGGER.error( + exception, + Messages.QMAN_100038_UNABLE_TO_SEND_WS_NOTIFICATION); + } + } + }; /** * NotificationFilter for "create" only events. @@ -99,7 +138,6 @@ { return EntityLifecycleNotification.INSTANCE_REMOVED_NOTIFICATION_TYPE.equals(notification.getType()); } - }; /** @@ -148,6 +186,16 @@ LOGGER.info( Messages.QMAN_000030_RESOURCE_HAS_BEEN_CREATED, eventSourceName); + + AsynchNotificationTask asynchNotificationTask = new AsynchNotificationTask( + getTopicName(lifecycleNotification.getClassKind()), + LifeCycleEvent.newCreateEvent( + eventSourceName.getKeyProperty(Names.OBJECT_ID), + lifecycleNotification.getPackageName(), + lifecycleNotification.getClassName())); + + _workManager.execute(asynchNotificationTask); + } catch (ArtifactsNotAvailableException exception) { LOGGER.error( @@ -213,6 +261,16 @@ LOGGER.info( Messages.QMAN_000031_RESOURCE_HAS_BEEN_REMOVED, eventSourceName); + + AsynchNotificationTask asynchNotificationTask = new AsynchNotificationTask( + getTopicName(lifecycleNotification.getClassKind()), + LifeCycleEvent.newRemoveEvent( + eventSourceName.getKeyProperty(Names.OBJECT_ID), + lifecycleNotification.getPackageName(), + lifecycleNotification.getClassName())); + + _workManager.execute(asynchNotificationTask); + } catch(Exception exception) { @@ -238,91 +296,14 @@ createLifeCycleTopics(); + initializeWorkManager(); + createQManResourceURI(); _mxServer = ManagementFactory.getPlatformMBeanServer(); _artifactsFactory = new WsArtifactsFactory(getEnvironment(),_mxServer); registerQManLifecycleListeners(); - - new Thread() - { - @Override - public void run() - { - while (true) - { - try - { - final NotificationProducer publisher = (NotificationProducer) getResource().getCapability(WsnConstants.PRODUCER_URI); - - publisher.publish( - Names.OBJECTS_LIFECYLE_TOPIC_NAME, - LifeCycleEvent.newCreateEvent( - UUID.randomUUID().toString(), - "org.apache.qpid.broker", - "connection")); - } catch (SoapFault e) - { - // TODO Auto-generated catch block - e.printStackTrace(); - } - try - { - Thread.sleep(10000); - } catch (InterruptedException e) - { - // TODO Auto-generated catch block - e.printStackTrace(); - } - } - } - }.start(); - } - - /** - * This adapter capability needs to be an event listener of QMan JMX core - * in order to detect relevant lifecycle events and therefore create WS artifacts & notification(s). - * - * @throws SoapFault when it's not possible to register event listener : is QMan running? - */ - @SuppressWarnings("serial") - private void registerQManLifecycleListeners() throws SoapFault - { - try - { - _mxServer.addNotificationListener( - Names.QMAN_OBJECT_NAME, - _listenerForNewInstances, - _filterForNewInstances, - null); - - _mxServer.addNotificationListener( - Names.QMAN_OBJECT_NAME, - _listenerForRemovedInstances, - _filterForRemovedInstances, - null); - - try - { - _mxServer.addNotificationListener( - Names.QPID_EMULATOR_OBJECT_NAME, - _listenerForNewInstances, - _filterForNewInstances, null); - - _mxServer.addNotificationListener( - Names.QPID_EMULATOR_OBJECT_NAME, - _listenerForRemovedInstances, - _filterForRemovedInstances, null); - - } catch (Exception exception) - { - LOGGER.info(Messages.QMAN_000028_TEST_MODULE_NOT_FOUND); - } - } catch(InstanceNotFoundException exception) - { - throw new SoapFault(exception); - } } /** @@ -369,7 +350,7 @@ throw new SoapFault(exception); } } - + /** * Creates the message handlers for the given capability. * @@ -406,33 +387,68 @@ } return handlers; } + + /** + * Returns the publisher capability associated with the owner resource. + * + * @return the publisher capability associated with the owner resource. + */ + NotificationProducer getPublisherCapability() + { + return (NotificationProducer) getResource().getCapability(WsnConstants.PRODUCER_URI); + } /** * Creates events & objects lifecycle topic that will be used to publish lifecycle event * messages.. */ - private void createLifeCycleTopics() + void createLifeCycleTopics() { try { - final NotificationProducer publisherCapability = (NotificationProducer) getResource() - .getCapability(WsnConstants.PRODUCER_URI); + _publisherCapability = getPublisherCapability(); - publisherCapability.addTopic(Names.EVENTS_LIFECYLE_TOPIC_NAME); + _publisherCapability.addTopic(Names.EVENTS_LIFECYLE_TOPIC_NAME); + _lifeCycleTopics.put(Names.EVENT,Names.EVENTS_LIFECYLE_TOPIC_NAME); + LOGGER.info( Messages.QMAN_000032_EVENTS_LIFECYCLE_TOPIC_HAS_BEEN_CREATED, Names.OBJECTS_LIFECYLE_TOPIC_NAME); - publisherCapability.addTopic(Names.OBJECTS_LIFECYLE_TOPIC_NAME); + _publisherCapability.addTopic(Names.OBJECTS_LIFECYLE_TOPIC_NAME); + _lifeCycleTopics.put(Names.CLASS,Names.OBJECTS_LIFECYLE_TOPIC_NAME); + LOGGER.info( Messages.QMAN_000033_OBJECTS_LIFECYCLE_TOPIC_HAS_BEEN_CREATED, Names.OBJECTS_LIFECYLE_TOPIC_NAME); + + _publisherCapability.addTopic(Names.UNKNOWN_OBJECT_TYPE_LIFECYLE_TOPIC_NAME); + LOGGER.info( + Messages.QMAN_000034_UNCLASSIFIED_LIFECYCLE_TOPIC_HAS_BEEN_CREATED, + Names.OBJECTS_LIFECYLE_TOPIC_NAME); } catch(Exception exception) { LOGGER.error(exception, Messages.QMAN_100036_TOPIC_DECLARATION_FAILURE); } } - + + /** + * Starting from an object type (i.e. event or class) returns the name of the + * corresponding topic where the lifecycle message must be published. + * Note that if the given object type is unknown then the "Unclassified Object Types" topic + * will be returned (and therefore the message will be published there). + * + * @param objectType the type of the object. + * @return the name of the topic associated with the given object type. + */ + QName getTopicName(String objectType) + { + QName topicName = _lifeCycleTopics.get(objectType); + return (topicName != null) + ? topicName + : Names.UNKNOWN_OBJECT_TYPE_LIFECYLE_TOPIC_NAME; + } + /** * Workaround : it seems that is not possibile to declare a serializer * for a byte array using muse descriptor... @@ -471,4 +487,63 @@ throw new SoapFault(exception); } } + + /** + * Initializes the work manager used for asynchronous notifications. + */ + private void initializeWorkManager() + { + Configuration configuration = Configuration.getInstance(); + _workManager = new ThreadPoolExecutor( + configuration.getWorkerManagerPoolSize(), + configuration.getWorkerManagerMaxPoolSize(), + configuration.getWorkerManagerKeepAliveTime(), + TimeUnit.MILLISECONDS, + new ArrayBlockingQueue<Runnable>(30)); + } + + /** + * This adapter capability needs to be an event listener of QMan JMX core + * in order to detect relevant lifecycle events and therefore create WS artifacts & notification(s). + * + * @throws SoapFault when it's not possible to register event listener : is QMan running? + */ + @SuppressWarnings("serial") + private void registerQManLifecycleListeners() throws SoapFault + { + try + { + _mxServer.addNotificationListener( + Names.QMAN_OBJECT_NAME, + _listenerForNewInstances, + _filterForNewInstances, + null); + + _mxServer.addNotificationListener( + Names.QMAN_OBJECT_NAME, + _listenerForRemovedInstances, + _filterForRemovedInstances, + null); + + try + { + _mxServer.addNotificationListener( + Names.QPID_EMULATOR_OBJECT_NAME, + _listenerForNewInstances, + _filterForNewInstances, null); + + _mxServer.addNotificationListener( + Names.QPID_EMULATOR_OBJECT_NAME, + _listenerForRemovedInstances, + _filterForRemovedInstances, null); + + } catch (Exception exception) + { + LOGGER.info(Messages.QMAN_000028_TEST_MODULE_NOT_FOUND); + } + } catch(InstanceNotFoundException exception) + { + throw new SoapFault(exception); + } + } } \ No newline at end of file Modified: qpid/trunk/qpid/java/management/client/src/test/java/log4j.xml URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/management/client/src/test/java/log4j.xml?rev=744478&r1=744477&r2=744478&view=diff ============================================================================== --- qpid/trunk/qpid/java/management/client/src/test/java/log4j.xml (original) +++ qpid/trunk/qpid/java/management/client/src/test/java/log4j.xml Sat Feb 14 10:49:38 2009 @@ -12,7 +12,12 @@ </layout> </appender> <category name="org.apache.qpid.management"> - <priority value="ERROR" /> + <priority value="INFO" /> + <appender-ref ref="CONSOLE" /> + </category> + + <category name="org.mortbay"> + <priority value="INFO" /> <appender-ref ref="CONSOLE" /> </category> Added: qpid/trunk/qpid/java/management/client/src/test/java/org/apache/qpid/management/wsdm/capabilities/QManAdapterCapabilityTest.java URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/management/client/src/test/java/org/apache/qpid/management/wsdm/capabilities/QManAdapterCapabilityTest.java?rev=744478&view=auto ============================================================================== --- qpid/trunk/qpid/java/management/client/src/test/java/org/apache/qpid/management/wsdm/capabilities/QManAdapterCapabilityTest.java (added) +++ qpid/trunk/qpid/java/management/client/src/test/java/org/apache/qpid/management/wsdm/capabilities/QManAdapterCapabilityTest.java Sat Feb 14 10:49:38 2009 @@ -0,0 +1,81 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +package org.apache.qpid.management.wsdm.capabilities; + +import java.lang.reflect.InvocationHandler; +import java.lang.reflect.Method; +import java.lang.reflect.Proxy; + +import org.apache.muse.ws.notification.NotificationProducer; +import org.apache.qpid.management.Names; + +import junit.framework.TestCase; + +/** + * Test case for QMan adapter capability. + * + * @author Andrea Gazzarini + */ +public class QManAdapterCapabilityTest extends TestCase +{ + /** + * Tests the execution of the getTopicName() method. + * + * <br>precondition : an object type is given to the method (null is allowed). + * <br>postcondition : according to getTopicName() specs, the name of the + * topic associated with the given object type must be returned. + */ + public void testGetTopicName() + { + final InvocationHandler invocationHandler = new InvocationHandler(){ + + public Object invoke(Object proxy, Method method, Object[] args) + { + return null; + } + }; + + QManAdapterCapability capability = new QManAdapterCapability(){ + @Override + NotificationProducer getPublisherCapability() + { + return (NotificationProducer) Proxy.newProxyInstance( + getClass().getClassLoader(), + new Class[]{NotificationProducer.class}, + invocationHandler); + } + }; + + capability.createLifeCycleTopics(); + + assertEquals( + Names.EVENTS_LIFECYLE_TOPIC_NAME, + capability.getTopicName(Names.EVENT)); + + assertEquals( + Names.OBJECTS_LIFECYLE_TOPIC_NAME, + capability.getTopicName(Names.CLASS)); + + assertEquals( + Names.UNKNOWN_OBJECT_TYPE_LIFECYLE_TOPIC_NAME, + capability.getTopicName("This is an unknown object Type @#!-...@#")); + } +} \ No newline at end of file --------------------------------------------------------------------- Apache Qpid - AMQP Messaging Implementation Project: http://qpid.apache.org Use/Interact: mailto:commits-subscr...@qpid.apache.org