Author: agazzarini
Date: Sat Feb 14 10:49:38 2009
New Revision: 744478

URL: http://svn.apache.org/viewvc?rev=744478&view=rev
Log:
QPID-1664 : QMan Asynchronous notification capability

Added:
    
qpid/trunk/qpid/java/management/client/src/main/java/org/apache/qpid/management/configuration/WorkerManagerConfigurationParser.java
    
qpid/trunk/qpid/java/management/client/src/test/java/org/apache/qpid/management/wsdm/capabilities/QManAdapterCapabilityTest.java
Modified:
    qpid/trunk/qpid/java/management/client/etc/qman-config.xml
    
qpid/trunk/qpid/java/management/client/src/main/java/org/apache/qpid/management/Messages.java
    
qpid/trunk/qpid/java/management/client/src/main/java/org/apache/qpid/management/Names.java
    
qpid/trunk/qpid/java/management/client/src/main/java/org/apache/qpid/management/configuration/BrokerConnectionDataParser.java
    
qpid/trunk/qpid/java/management/client/src/main/java/org/apache/qpid/management/configuration/Configuration.java
    
qpid/trunk/qpid/java/management/client/src/main/java/org/apache/qpid/management/configuration/Configurator.java
    
qpid/trunk/qpid/java/management/client/src/main/java/org/apache/qpid/management/configuration/Tag.java
    
qpid/trunk/qpid/java/management/client/src/main/java/org/apache/qpid/management/wsdm/capabilities/QManAdapterCapability.java
    qpid/trunk/qpid/java/management/client/src/test/java/log4j.xml

Modified: qpid/trunk/qpid/java/management/client/etc/qman-config.xml
URL: 
http://svn.apache.org/viewvc/qpid/trunk/qpid/java/management/client/etc/qman-config.xml?rev=744478&r1=744477&r2=744478&view=diff
==============================================================================
--- qpid/trunk/qpid/java/management/client/etc/qman-config.xml (original)
+++ qpid/trunk/qpid/java/management/client/etc/qman-config.xml Sat Feb 14 
10:49:38 2009
@@ -18,13 +18,14 @@
  - under the License.
  -
  -->
-<!-- 
-Default configuration for QMan is empty; 
-that is, there's no broker configured at startup.
-If  you want to connect with a running broker when QMan starts up, 
-you can do that uncommenting and editing the template reported below.
--->
 <configuration>
+
+<!-- 
+Default configuration for QMan has no broker settings; 
+that is, there's no broker configured at startup.
+If  you want to connect with a running broker when QMan starts up, 
+you can do that uncommenting and editing the template reported below.
+-->
 <!--   <brokers>
                <broker>
                        <host>localhost</host>
@@ -47,5 +48,21 @@
                        <max-wait-timeout>-1</max-wait-timeout>
                </broker>
   </brokers>
-  --> 
+  --> 
+  <!-- Internal worked manager configuration-->
+       <work-manager>
+               <!-- The size of the worker thread pool -->
+               <pool-capacity>5</pool-capacity>
+
+               <!-- Maximum size of the worker thread pool -->
+               <max-pool-capacity>15</max-pool-capacity>
+
+               <!-- 
+                       when the current number of threads is greater than
+               the pool-capacity, this is the maximum time that excess threads
+               can be in an idle state (without any task assigned) before 
terminating.
+               The value is expressed is milliseconds.
+               -->
+               <keep-alive-time>5000</keep-alive-time>
+  </work-manager>
 </configuration>

Modified: 
qpid/trunk/qpid/java/management/client/src/main/java/org/apache/qpid/management/Messages.java
URL: 
http://svn.apache.org/viewvc/qpid/trunk/qpid/java/management/client/src/main/java/org/apache/qpid/management/Messages.java?rev=744478&r1=744477&r2=744478&view=diff
==============================================================================
--- 
qpid/trunk/qpid/java/management/client/src/main/java/org/apache/qpid/management/Messages.java
 (original)
+++ 
qpid/trunk/qpid/java/management/client/src/main/java/org/apache/qpid/management/Messages.java
 Sat Feb 14 10:49:38 2009
@@ -64,9 +64,12 @@
        String QMAN_000029_DEFAULT_URI = "<QMAN-000029> : Default URI will be 
set to %s";
        String QMAN_000030_RESOURCE_HAS_BEEN_CREATED =  "<QMAN-000030> : New 
resource instance has been created and registered. Resource id is %s";
        String QMAN_000031_RESOURCE_HAS_BEEN_REMOVED = "<QMAN-000031> : 
WS-Resource %s has been removed";
-       String QMAN_000032_EVENTS_LIFECYCLE_TOPIC_HAS_BEEN_CREATED = 
"<QMAN-000032> : Events lifecycle topic has been created. Its name is %s";
-       String QMAN_000033_OBJECTS_LIFECYCLE_TOPIC_HAS_BEEN_CREATED = 
"<QMAN-000033> : Objects lifecycle topic has been created. Its name is %s";
-       
+       String QMAN_000032_EVENTS_LIFECYCLE_TOPIC_HAS_BEEN_CREATED = 
"<QMAN-000032> : Events lifecycle topic has been created with name %s";
+       String QMAN_000033_OBJECTS_LIFECYCLE_TOPIC_HAS_BEEN_CREATED = 
"<QMAN-000033> : Objects lifecycle topic has been created with name %s";
+       String QMAN_000034_UNCLASSIFIED_LIFECYCLE_TOPIC_HAS_BEEN_CREATED = 
"<QMAN-000034> : Unclassified object types lifecycle topic has been created 
with name %s";
+       String QMAN_000035_WORK_MANAGER_POOL_SIZE = "<QMAN-000035> : Work 
Manager thread pool size : %s";
+       String QMAN_000036_WORK_MANAGER_MAX_POOL_SIZE = "<QMAN-000036> : Work 
Manager thread pool max size : %s";
+       String QMAN_000037_WORK_MANAGER_KEEP_ALIVE_TIME = "<QMAN-000035> : Work 
Manager keep alive time : %s";
                
         // DEBUG
        String QMAN_200001_INCOMING_MESSAGE_HAS_BEEN_RECEIVED = "<QMAN-200001> 
: New incoming message has been received. Message content is %s";
@@ -165,4 +168,8 @@
        String QMAN_100035_GET_ATTRIBUTE_FAILURE = "<QMAN-100035> : Get 
Attribute invocation failure for attribute %s, resource %s.";
        String QMAN_100036_SET_ATTRIBUTE_FAILURE = "<QMAN-100036> : Set 
Attribute invocation failure for attribute %s, resource %s.";
        String QMAN_100037_INVOKE_OPERATION_FAILURE = "<QMAN-100037> : 
Operation Invocation failure for operation.";    
+       String QMAN_100038_UNABLE_TO_SEND_WS_NOTIFICATION = "<QMAN-100038> : 
Unable to send notification.";     
+       String QMAN_100039_UNABLE_TO_CONFIGURE_PROPERLY_WORKER_MANAGER = 
"<QMAN-100039> : Unable to properly configure WorkManager. A malformed property 
(NaN) was given as input parameter.";  
+       
+       
 }
\ No newline at end of file

Modified: 
qpid/trunk/qpid/java/management/client/src/main/java/org/apache/qpid/management/Names.java
URL: 
http://svn.apache.org/viewvc/qpid/trunk/qpid/java/management/client/src/main/java/org/apache/qpid/management/Names.java?rev=744478&r1=744477&r2=744478&view=diff
==============================================================================
--- 
qpid/trunk/qpid/java/management/client/src/main/java/org/apache/qpid/management/Names.java
 (original)
+++ 
qpid/trunk/qpid/java/management/client/src/main/java/org/apache/qpid/management/Names.java
 Sat Feb 14 10:49:38 2009
@@ -170,6 +170,12 @@
                        "EventsLifeCycleTopic",
                        Names.PREFIX);
        
+       public final static QName UNKNOWN_OBJECT_TYPE_LIFECYLE_TOPIC_NAME= new 
QName(
+                       Names.NAMESPACE_URI,
+                       "UnclassifiedLifeCycleTopic",
+                       Names.PREFIX);
+       
+       
        public final static String NAME_ATTRIBUTE = "name";
        public final static String MODIFIABILITY = "modifiability";
        public final static String READ_WRITE = "read-write";

Modified: 
qpid/trunk/qpid/java/management/client/src/main/java/org/apache/qpid/management/configuration/BrokerConnectionDataParser.java
URL: 
http://svn.apache.org/viewvc/qpid/trunk/qpid/java/management/client/src/main/java/org/apache/qpid/management/configuration/BrokerConnectionDataParser.java?rev=744478&r1=744477&r2=744478&view=diff
==============================================================================
--- 
qpid/trunk/qpid/java/management/client/src/main/java/org/apache/qpid/management/configuration/BrokerConnectionDataParser.java
 (original)
+++ 
qpid/trunk/qpid/java/management/client/src/main/java/org/apache/qpid/management/configuration/BrokerConnectionDataParser.java
 Sat Feb 14 10:49:38 2009
@@ -26,9 +26,8 @@
 import org.apache.qpid.transport.util.Logger;
 
 /**
- * Parser used for building access mode mappings.
- * For each access-mode-mappings/mapping element found in the configuration 
file, a new access mode mapping 
- * is built and injected into the bridge configuration.
+ * Parser used for building broker connection data settings.
+ * The corresponding section on the configuration file is :
  * 
         <broker>
             <host>192.168.148.131</host>

Modified: 
qpid/trunk/qpid/java/management/client/src/main/java/org/apache/qpid/management/configuration/Configuration.java
URL: 
http://svn.apache.org/viewvc/qpid/trunk/qpid/java/management/client/src/main/java/org/apache/qpid/management/configuration/Configuration.java?rev=744478&r1=744477&r2=744478&view=diff
==============================================================================
--- 
qpid/trunk/qpid/java/management/client/src/main/java/org/apache/qpid/management/configuration/Configuration.java
 (original)
+++ 
qpid/trunk/qpid/java/management/client/src/main/java/org/apache/qpid/management/configuration/Configuration.java
 Sat Feb 14 10:49:38 2009
@@ -25,13 +25,10 @@
 import java.util.Set;
 import java.util.UUID;
 import java.util.Map.Entry;
-import java.util.concurrent.BlockingQueue;
-import java.util.concurrent.SynchronousQueue;
 
 import org.apache.qpid.management.Messages;
 import org.apache.qpid.management.Names;
 import org.apache.qpid.management.domain.handler.base.IMessageHandler;
-import org.apache.qpid.management.domain.handler.impl.InvocationResult;
 import org.apache.qpid.management.domain.model.AccessMode;
 import org.apache.qpid.management.domain.model.type.Type;
 import org.apache.qpid.transport.DeliveryProperties;
@@ -49,6 +46,11 @@
     private final static Logger LOGGER = Logger.get(Configuration.class);
     private static Configuration INSTANCE = new Configuration();
     
+    // Work Manager default settings
+    private int _poolSize = 5;
+    private int _maxPoolSize = 15;
+    private long _keepAliveTime = 5000;
+    
     Map<Integer, Type> _typeMappings = new HashMap<Integer,Type>();
     Map<Integer,AccessMode> _accessModes = new HashMap<Integer, AccessMode>();
     Map<Type,String> _validators = new HashMap<Type, String>();
@@ -64,8 +66,7 @@
     private Header _headerForCommandMessages;
     private DeliveryProperties _deliveryProperties = new DeliveryProperties();
     private MessageProperties _messageProperties = new MessageProperties();
-    public BlockingQueue<InvocationResult> _resultExchangeChannel = new 
SynchronousQueue<InvocationResult>();
-    
+        
     // Private constructor.
     private Configuration()
     {
@@ -355,4 +356,68 @@
         
LOGGER.debug(Messages.QMAN_200004_MANAGEMENT_QUEUE_NAME,_managementQueueName);
         
LOGGER.debug(Messages.QMAN_200005_METHOD_REPLY_QUEUE_NAME,_methodReplyQueueName);
        
     }
-}
+
+    /**
+     * Returns the worker manager thread pool size.
+     * 
+     * @return the worker manager thread pool size.
+     */
+       public int getWorkerManagerPoolSize()
+       {
+               return _poolSize;
+       }
+
+       /**
+        * Sets the size of the worker manager thread pool.
+        * 
+        * @param poolSize the size of the worker manager thread pool.
+        */
+       void setWorkerManagerPoolSize(int poolSize)
+       {
+               this._poolSize = poolSize;
+       }
+
+       /**
+        * Returns the maximum size of the worker manager 
+        * thread pool size.
+        * 
+        * @return the max size of the worker manager thread pool.
+        */
+       public int getWorkerManagerMaxPoolSize()
+       {
+               return _maxPoolSize;
+       }
+
+       /**
+        * Sets the maximum size of the worker manager 
+        * thread pool size.
+        * 
+        * @param maxPoolSize the max size of the worker manager thread pool.
+        */     
+       void setWorkerManagerMaxPoolSize(int maxPoolSize)
+       {
+               this._maxPoolSize = maxPoolSize;
+       }
+
+       /**
+        * Returns the max amount of time that an excess thread
+        * can be idle before purging from the pool.
+        * 
+        * @return the max keep alive time.
+        */
+       public long getWorkerManagerKeepAliveTime()
+       {
+               return _keepAliveTime;
+       }
+
+       /**
+        * Sets the max amount of time that an excess thread
+        * can be idle before purging from the pool.
+        * 
+        * @param keepAliveTime the max keep alive time.
+        */
+       void setWorkerManagerKeepAliveTime(long keepAliveTime)
+       {
+               this._keepAliveTime = keepAliveTime;
+       }
+}
\ No newline at end of file

Modified: 
qpid/trunk/qpid/java/management/client/src/main/java/org/apache/qpid/management/configuration/Configurator.java
URL: 
http://svn.apache.org/viewvc/qpid/trunk/qpid/java/management/client/src/main/java/org/apache/qpid/management/configuration/Configurator.java?rev=744478&r1=744477&r2=744478&view=diff
==============================================================================
--- 
qpid/trunk/qpid/java/management/client/src/main/java/org/apache/qpid/management/configuration/Configurator.java
 (original)
+++ 
qpid/trunk/qpid/java/management/client/src/main/java/org/apache/qpid/management/configuration/Configurator.java
 Sat Feb 14 10:49:38 2009
@@ -77,6 +77,7 @@
     };
     
     IParser _brokerConfigurationParser = new BrokerConnectionDataParser();
+    IParser _workerManagerConfigurationParser = new 
WorkerManagerConfigurationParser();
     IParser _currentParser = DEFAULT_PARSER;
 
     /**
@@ -97,12 +98,18 @@
     @Override
     public void startElement (String uri, String localName, String name, 
Attributes attributes) throws SAXException
     {
-        switch(Tag.get(name)) {
-            case BROKERS: 
+        switch(Tag.get(name)) 
+        {
+            case BROKERS : 
             {
                 _currentParser = _brokerConfigurationParser;
                 break;
             }
+            case WORK_MANAGER : 
+            {
+               _currentParser = _workerManagerConfigurationParser;
+               break;
+            }
         }
     }
     

Modified: 
qpid/trunk/qpid/java/management/client/src/main/java/org/apache/qpid/management/configuration/Tag.java
URL: 
http://svn.apache.org/viewvc/qpid/trunk/qpid/java/management/client/src/main/java/org/apache/qpid/management/configuration/Tag.java?rev=744478&r1=744477&r2=744478&view=diff
==============================================================================
--- 
qpid/trunk/qpid/java/management/client/src/main/java/org/apache/qpid/management/configuration/Tag.java
 (original)
+++ 
qpid/trunk/qpid/java/management/client/src/main/java/org/apache/qpid/management/configuration/Tag.java
 Sat Feb 14 10:49:38 2009
@@ -37,7 +37,10 @@
        VIRTUAL_HOST { @Override public String toString() { return 
"virtual-host"; }},
        USER { @Override public String toString() { return "user"; }},
        PASSWORD { @Override public String toString() { return "password"; }},
-       BROKERS { @Override public String toString() { return "brokers"; }};
+       BROKERS { @Override public String toString() { return "brokers"; }},
+       WORK_MANAGER { @Override public String toString() { return 
"work-manager"; }},
+       POOL_CAPACITY  { @Override public String toString() { return 
"pool-capacity"; }},
+       KEEP_ALIVE_TIME { @Override public String toString() { return 
"keep-alive-time"; }};
        
        /**
         * Returns the enum entry associated to the given tag name.

Added: 
qpid/trunk/qpid/java/management/client/src/main/java/org/apache/qpid/management/configuration/WorkerManagerConfigurationParser.java
URL: 
http://svn.apache.org/viewvc/qpid/trunk/qpid/java/management/client/src/main/java/org/apache/qpid/management/configuration/WorkerManagerConfigurationParser.java?rev=744478&view=auto
==============================================================================
--- 
qpid/trunk/qpid/java/management/client/src/main/java/org/apache/qpid/management/configuration/WorkerManagerConfigurationParser.java
 (added)
+++ 
qpid/trunk/qpid/java/management/client/src/main/java/org/apache/qpid/management/configuration/WorkerManagerConfigurationParser.java
 Sat Feb 14 10:49:38 2009
@@ -0,0 +1,108 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.qpid.management.configuration;
+
+import java.util.UUID;
+
+import org.apache.qpid.management.Messages;
+import org.apache.qpid.transport.util.Logger;
+
+/**
+ * Parser used for building worker manager settings.
+ * The corresponding section of the configuration file is :
+ * 
+       <work-manager>
+               <pool-capacity>5</pool-capacity>
+               <max-pool-capacity>15</max-pool-capacity>
+               <keep-alive-time>5000</keep-alive-time>
+  </work-manager>
+
+ * 
+ * @author Andrea Gazzarini
+ */
+class WorkerManagerConfigurationParser implements IParser
+{
+    private final static Logger LOGGER = Logger.get(Configuration.class);
+    private String _currentValue;
+    
+    private String _poolSizeAsString;
+    private String _maxPoolSizeAsString;
+    private String _keepAliveTimeAsString;
+    
+    /**
+     * Callback : the given value is the text content of the current node.
+     */
+    public void setCurrrentAttributeValue (String value)
+    {
+        this._currentValue = value;
+    }
+
+    /**
+     * Callback: each time the end of an element is reached 
+     * this method is called.
+     */
+    public void setCurrentAttributeName (String name)
+    {
+        switch (Tag.get(name))
+        {
+            case POOL_CAPACITY: 
+            {
+                _poolSizeAsString = _currentValue.trim();
+                break;
+            }
+            case MAX_POOL_CAPACITY : 
+            {
+               _maxPoolSizeAsString = _currentValue;
+            }
+            case KEEP_ALIVE_TIME: 
+            {
+               _keepAliveTimeAsString = _currentValue;
+                break;
+            }
+            case WORK_MANAGER: 
+            {
+               Configuration configuration = Configuration.getInstance();
+                try 
+                {
+                       
configuration.setWorkerManagerPoolSize(Integer.parseInt(_poolSizeAsString));
+                    
configuration.setWorkerManagerMaxPoolSize(Integer.parseInt(_maxPoolSizeAsString));
+                    
configuration.setWorkerManagerKeepAliveTime(Long.parseLong(_keepAliveTimeAsString));
                    
+                } catch(Exception exception) 
+                {
+                    
LOGGER.error(Messages.QMAN_100039_UNABLE_TO_CONFIGURE_PROPERLY_WORKER_MANAGER);
+                } finally {
+                    
LOGGER.info(Messages.QMAN_000035_WORK_MANAGER_POOL_SIZE,configuration.getWorkerManagerPoolSize());
+                    
LOGGER.info(Messages.QMAN_000036_WORK_MANAGER_MAX_POOL_SIZE,configuration.getWorkerManagerMaxPoolSize());
+                    
LOGGER.info(Messages.QMAN_000037_WORK_MANAGER_KEEP_ALIVE_TIME,configuration.getWorkerManagerKeepAliveTime());
                      
+                }
+                break;
+            }
+        }
+    }
+    
+    /**
+     * Gets an uuid in order to associate current connection data with a 
broker.
+     * @return
+     */
+    UUID getUUId(){
+      return UUID.randomUUID();  
+    }
+}

Modified: 
qpid/trunk/qpid/java/management/client/src/main/java/org/apache/qpid/management/wsdm/capabilities/QManAdapterCapability.java
URL: 
http://svn.apache.org/viewvc/qpid/trunk/qpid/java/management/client/src/main/java/org/apache/qpid/management/wsdm/capabilities/QManAdapterCapability.java?rev=744478&r1=744477&r2=744478&view=diff
==============================================================================
--- 
qpid/trunk/qpid/java/management/client/src/main/java/org/apache/qpid/management/wsdm/capabilities/QManAdapterCapability.java
 (original)
+++ 
qpid/trunk/qpid/java/management/client/src/main/java/org/apache/qpid/management/wsdm/capabilities/QManAdapterCapability.java
 Sat Feb 14 10:49:38 2009
@@ -25,7 +25,11 @@
 import java.net.URI;
 import java.util.ArrayList;
 import java.util.Collection;
-import java.util.UUID;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.concurrent.ArrayBlockingQueue;
+import java.util.concurrent.ThreadPoolExecutor;
+import java.util.concurrent.TimeUnit;
 
 import javax.management.InstanceNotFoundException;
 import javax.management.MBeanServer;
@@ -46,6 +50,7 @@
 import org.apache.muse.ws.notification.WsnConstants;
 import org.apache.qpid.management.Messages;
 import org.apache.qpid.management.Names;
+import org.apache.qpid.management.configuration.Configuration;
 import org.apache.qpid.management.jmx.EntityLifecycleNotification;
 import org.apache.qpid.management.wsdm.common.ThreadSessionManager;
 import org.apache.qpid.management.wsdm.muse.engine.WSDMAdapterEnvironment;
@@ -67,6 +72,40 @@
        private MBeanServer _mxServer;
        private WsArtifactsFactory _artifactsFactory; 
        private URI _resourceURI;
+       private NotificationProducer _publisherCapability;
+       private ThreadPoolExecutor _workManager;
+       private Map<String, QName> _lifeCycleTopics = new HashMap<String, 
QName>();
+       
+       /**
+        * Runnable wrapper used for sending asynchronous 
+        * notifications.
+        * 
+        * @author Andrea Gazzarini
+        */
+       private final class AsynchNotificationTask implements Runnable 
+       {
+               private final QName topicName;
+               private final LifeCycleEvent event;
+               
+               AsynchNotificationTask(QName tName, LifeCycleEvent evt)
+               {
+                       topicName = tName;
+                       event = evt;
+               }
+               
+               public void run()
+               {
+                       try
+                       {
+                               _publisherCapability.publish(topicName,event);
+                       } catch (SoapFault exception)
+                       {
+                               LOGGER.error(
+                                               exception,
+                                               
Messages.QMAN_100038_UNABLE_TO_SEND_WS_NOTIFICATION);
+                       }                       
+               }
+       };
        
        /**
         * NotificationFilter for "create" only events.
@@ -99,7 +138,6 @@
                {
                        return 
EntityLifecycleNotification.INSTANCE_REMOVED_NOTIFICATION_TYPE.equals(notification.getType());
                }
-               
        };
        
        /**
@@ -148,6 +186,16 @@
                                LOGGER.info(
                                                
Messages.QMAN_000030_RESOURCE_HAS_BEEN_CREATED,
                                                eventSourceName);
+                               
+                               AsynchNotificationTask asynchNotificationTask = 
new AsynchNotificationTask(
+                                               
getTopicName(lifecycleNotification.getClassKind()),
+                                               LifeCycleEvent.newCreateEvent(
+                                                               
eventSourceName.getKeyProperty(Names.OBJECT_ID), 
+                                                               
lifecycleNotification.getPackageName(),
+                                                               
lifecycleNotification.getClassName()));
+                               
+                               _workManager.execute(asynchNotificationTask);
+                               
                        } catch (ArtifactsNotAvailableException exception) 
                        {
                                LOGGER.error(
@@ -213,6 +261,16 @@
                                LOGGER.info(
                                                
Messages.QMAN_000031_RESOURCE_HAS_BEEN_REMOVED, 
                                                eventSourceName);
+
+                               AsynchNotificationTask asynchNotificationTask = 
new AsynchNotificationTask(
+                                               
getTopicName(lifecycleNotification.getClassKind()),
+                                               LifeCycleEvent.newRemoveEvent(
+                                                               
eventSourceName.getKeyProperty(Names.OBJECT_ID), 
+                                                               
lifecycleNotification.getPackageName(),
+                                                               
lifecycleNotification.getClassName()));
+                               
+                               _workManager.execute(asynchNotificationTask);
+
                        }
                        catch(Exception exception) 
                        {
@@ -238,91 +296,14 @@
                
                createLifeCycleTopics();
                
+               initializeWorkManager();
+               
                createQManResourceURI();
 
                _mxServer = ManagementFactory.getPlatformMBeanServer();
                _artifactsFactory = new 
WsArtifactsFactory(getEnvironment(),_mxServer);
                
                registerQManLifecycleListeners();       
-               
-               new Thread()
-               {
-                       @Override
-                       public void run()
-                       {
-                               while (true)
-                               {
-                                       try
-                                       {
-                                               final NotificationProducer 
publisher = (NotificationProducer) 
getResource().getCapability(WsnConstants.PRODUCER_URI);                   
-                                               
-                                               publisher.publish(
-                                                               
Names.OBJECTS_LIFECYLE_TOPIC_NAME, 
-                                                               
LifeCycleEvent.newCreateEvent(
-                                                                               
UUID.randomUUID().toString(), 
-                                                                               
"org.apache.qpid.broker",
-                                                                               
"connection"));
-                                       } catch (SoapFault e)
-                                       {
-                                               // TODO Auto-generated catch 
block
-                                               e.printStackTrace();
-                                       }
-                                       try
-                                       {
-                                               Thread.sleep(10000);
-                                       } catch (InterruptedException e)
-                                       {
-                                               // TODO Auto-generated catch 
block
-                                               e.printStackTrace();
-                                       }
-                               } 
-                       }
-               }.start();
-       }
-
-       /**
-        * This adapter capability needs to be an event listener of QMan JMX 
core 
-        * in order to detect relevant lifecycle events and therefore create WS 
artifacts & notification(s).
-        * 
-        * @throws SoapFault when it's not possible to register event listener 
: is QMan running?
-        */
-       @SuppressWarnings("serial")
-       private void registerQManLifecycleListeners() throws SoapFault
-       {
-               try 
-               {                       
-                       _mxServer.addNotificationListener(
-                                       Names.QMAN_OBJECT_NAME, 
-                                       _listenerForNewInstances, 
-                                       _filterForNewInstances, 
-                                       null);
-                       
-                       _mxServer.addNotificationListener(
-                                       Names.QMAN_OBJECT_NAME, 
-                                       _listenerForRemovedInstances, 
-                                       _filterForRemovedInstances, 
-                                       null);
-
-                       try 
-                       {
-                               _mxServer.addNotificationListener(
-                                               
Names.QPID_EMULATOR_OBJECT_NAME, 
-                                               _listenerForNewInstances, 
-                                               _filterForNewInstances, null);
-
-                               _mxServer.addNotificationListener(
-                                               
Names.QPID_EMULATOR_OBJECT_NAME, 
-                                               _listenerForRemovedInstances, 
-                                               _filterForRemovedInstances, 
null);
-
-                       } catch (Exception exception) 
-                       {
-                               
LOGGER.info(Messages.QMAN_000028_TEST_MODULE_NOT_FOUND);
-                       } 
-               }  catch(InstanceNotFoundException exception) 
-               {
-                       throw new SoapFault(exception); 
-               }
        }
 
        /**
@@ -369,7 +350,7 @@
                        throw new SoapFault(exception);
                }
        }
-
+               
        /**
         * Creates the message handlers for the given capability.
         * 
@@ -406,33 +387,68 @@
         }
         return handlers;       
     }
+
+       /**
+        * Returns the publisher capability associated with the owner resource.
+        * 
+        * @return the publisher capability associated with the owner resource.
+        */
+       NotificationProducer getPublisherCapability()
+       {
+               return (NotificationProducer) 
getResource().getCapability(WsnConstants.PRODUCER_URI);
+       }
        
        /**
         * Creates events & objects lifecycle topic that will be used to 
publish lifecycle event
         * messages..
         */
-       private void createLifeCycleTopics() 
+       void createLifeCycleTopics() 
        {
                try 
                {
-                       final NotificationProducer publisherCapability = 
(NotificationProducer) getResource()
-                                       
.getCapability(WsnConstants.PRODUCER_URI);
+                       _publisherCapability = getPublisherCapability();
                        
-                       
publisherCapability.addTopic(Names.EVENTS_LIFECYLE_TOPIC_NAME);
+                       
_publisherCapability.addTopic(Names.EVENTS_LIFECYLE_TOPIC_NAME);
+                       
_lifeCycleTopics.put(Names.EVENT,Names.EVENTS_LIFECYLE_TOPIC_NAME);
+
                        LOGGER.info(
                                        
Messages.QMAN_000032_EVENTS_LIFECYCLE_TOPIC_HAS_BEEN_CREATED, 
                                        Names.OBJECTS_LIFECYLE_TOPIC_NAME);
                        
-                       
publisherCapability.addTopic(Names.OBJECTS_LIFECYLE_TOPIC_NAME);                
+                       
_publisherCapability.addTopic(Names.OBJECTS_LIFECYLE_TOPIC_NAME);               
+                       
_lifeCycleTopics.put(Names.CLASS,Names.OBJECTS_LIFECYLE_TOPIC_NAME);
+
                        LOGGER.info(
                                        
Messages.QMAN_000033_OBJECTS_LIFECYCLE_TOPIC_HAS_BEEN_CREATED, 
                                        Names.OBJECTS_LIFECYLE_TOPIC_NAME);
+                       
+                       
_publisherCapability.addTopic(Names.UNKNOWN_OBJECT_TYPE_LIFECYLE_TOPIC_NAME);   
                                
+                       LOGGER.info(
+                                       
Messages.QMAN_000034_UNCLASSIFIED_LIFECYCLE_TOPIC_HAS_BEEN_CREATED, 
+                                       Names.OBJECTS_LIFECYLE_TOPIC_NAME);
                } catch(Exception exception) 
                {
                        LOGGER.error(exception, 
Messages.QMAN_100036_TOPIC_DECLARATION_FAILURE);
                }
        }
-
+       
+       /**
+        * Starting from an object type (i.e. event or class) returns the name 
of the
+        * corresponding topic where the lifecycle message must be published.
+        * Note that if the given object type is unknown then the "Unclassified 
Object Types" topic 
+        * will be returned (and therefore the message will be published there).
+        * 
+        * @param objectType the type of the object.
+        * @return the name of the topic associated with the given object type.
+        */
+       QName getTopicName(String objectType) 
+       {
+               QName topicName = _lifeCycleTopics.get(objectType);
+               return (topicName != null) 
+                       ? topicName 
+                       : Names.UNKNOWN_OBJECT_TYPE_LIFECYLE_TOPIC_NAME;
+       }
+       
        /** 
         * Workaround : it seems that is not possibile to declare a serializer 
         * for a byte array using muse descriptor...
@@ -471,4 +487,63 @@
                        throw new SoapFault(exception);
                }
        }       
+       
+       /**
+        * Initializes the work manager used for asynchronous notifications.
+        */
+       private void initializeWorkManager()
+       {
+               Configuration configuration = Configuration.getInstance();
+               _workManager = new ThreadPoolExecutor(
+                               configuration.getWorkerManagerPoolSize(),
+                               configuration.getWorkerManagerMaxPoolSize(),
+                               configuration.getWorkerManagerKeepAliveTime(),
+                               TimeUnit.MILLISECONDS,
+                               new ArrayBlockingQueue<Runnable>(30));
+       }
+
+       /**
+        * This adapter capability needs to be an event listener of QMan JMX 
core 
+        * in order to detect relevant lifecycle events and therefore create WS 
artifacts & notification(s).
+        * 
+        * @throws SoapFault when it's not possible to register event listener 
: is QMan running?
+        */
+       @SuppressWarnings("serial")
+       private void registerQManLifecycleListeners() throws SoapFault
+       {
+               try 
+               {                       
+                       _mxServer.addNotificationListener(
+                                       Names.QMAN_OBJECT_NAME, 
+                                       _listenerForNewInstances, 
+                                       _filterForNewInstances, 
+                                       null);
+                       
+                       _mxServer.addNotificationListener(
+                                       Names.QMAN_OBJECT_NAME, 
+                                       _listenerForRemovedInstances, 
+                                       _filterForRemovedInstances, 
+                                       null);
+
+                       try 
+                       {
+                               _mxServer.addNotificationListener(
+                                               
Names.QPID_EMULATOR_OBJECT_NAME, 
+                                               _listenerForNewInstances, 
+                                               _filterForNewInstances, null);
+
+                               _mxServer.addNotificationListener(
+                                               
Names.QPID_EMULATOR_OBJECT_NAME, 
+                                               _listenerForRemovedInstances, 
+                                               _filterForRemovedInstances, 
null);
+
+                       } catch (Exception exception) 
+                       {
+                               
LOGGER.info(Messages.QMAN_000028_TEST_MODULE_NOT_FOUND);
+                       } 
+               }  catch(InstanceNotFoundException exception) 
+               {
+                       throw new SoapFault(exception); 
+               }
+       }       
 }
\ No newline at end of file

Modified: qpid/trunk/qpid/java/management/client/src/test/java/log4j.xml
URL: 
http://svn.apache.org/viewvc/qpid/trunk/qpid/java/management/client/src/test/java/log4j.xml?rev=744478&r1=744477&r2=744478&view=diff
==============================================================================
--- qpid/trunk/qpid/java/management/client/src/test/java/log4j.xml (original)
+++ qpid/trunk/qpid/java/management/client/src/test/java/log4j.xml Sat Feb 14 
10:49:38 2009
@@ -12,7 +12,12 @@
                </layout>
        </appender>
        <category name="org.apache.qpid.management">
-               <priority value="ERROR" />
+               <priority value="INFO" />
+               <appender-ref ref="CONSOLE" />
+       </category>     
+
+       <category name="org.mortbay">
+               <priority value="INFO" />
                <appender-ref ref="CONSOLE" />
        </category>     
 

Added: 
qpid/trunk/qpid/java/management/client/src/test/java/org/apache/qpid/management/wsdm/capabilities/QManAdapterCapabilityTest.java
URL: 
http://svn.apache.org/viewvc/qpid/trunk/qpid/java/management/client/src/test/java/org/apache/qpid/management/wsdm/capabilities/QManAdapterCapabilityTest.java?rev=744478&view=auto
==============================================================================
--- 
qpid/trunk/qpid/java/management/client/src/test/java/org/apache/qpid/management/wsdm/capabilities/QManAdapterCapabilityTest.java
 (added)
+++ 
qpid/trunk/qpid/java/management/client/src/test/java/org/apache/qpid/management/wsdm/capabilities/QManAdapterCapabilityTest.java
 Sat Feb 14 10:49:38 2009
@@ -0,0 +1,81 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.qpid.management.wsdm.capabilities;
+
+import java.lang.reflect.InvocationHandler;
+import java.lang.reflect.Method;
+import java.lang.reflect.Proxy;
+
+import org.apache.muse.ws.notification.NotificationProducer;
+import org.apache.qpid.management.Names;
+
+import junit.framework.TestCase;
+
+/**
+ * Test case for QMan adapter capability.
+ * 
+ * @author Andrea Gazzarini
+ */
+public class QManAdapterCapabilityTest extends TestCase
+{      
+       /**
+        * Tests the execution of the getTopicName() method.
+        * 
+        * <br>precondition : an object type is given to the method (null is 
allowed).
+        * <br>postcondition : according to getTopicName() specs, the name of 
the 
+        *              topic associated with the given object type must be 
returned.
+        */
+       public void testGetTopicName() 
+       {
+               final InvocationHandler invocationHandler = new 
InvocationHandler(){
+
+                       public Object invoke(Object proxy, Method method, 
Object[] args) 
+                       {
+                               return null;
+                       }
+               };
+               
+               QManAdapterCapability capability = new QManAdapterCapability(){
+                       @Override
+                       NotificationProducer getPublisherCapability()
+                       {
+                               return (NotificationProducer) 
Proxy.newProxyInstance(
+                                               getClass().getClassLoader(), 
+                                               new 
Class[]{NotificationProducer.class},
+                                               invocationHandler);
+                       }
+               };
+               
+               capability.createLifeCycleTopics();
+               
+               assertEquals(
+                               Names.EVENTS_LIFECYLE_TOPIC_NAME,
+                               capability.getTopicName(Names.EVENT));
+               
+               assertEquals(
+                               Names.OBJECTS_LIFECYLE_TOPIC_NAME,
+                               capability.getTopicName(Names.CLASS));          
+               
+               assertEquals(
+                               Names.UNKNOWN_OBJECT_TYPE_LIFECYLE_TOPIC_NAME,
+                               capability.getTopicName("This is an unknown 
object Type @#!-...@#"));                           
+       }
+}
\ No newline at end of file



---------------------------------------------------------------------
Apache Qpid - AMQP Messaging Implementation
Project:      http://qpid.apache.org
Use/Interact: mailto:commits-subscr...@qpid.apache.org

Reply via email to