Author: rajdavies
Date: Wed Jan 16 05:56:24 2008
New Revision: 612459
URL: http://svn.apache.org/viewvc?rev=612459&view=rev
Log:
set correct consumer count on consumer advisories
Modified:
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/advisory/AdvisoryBroker.java
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/BrokerFilter.java
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/BrokerService.java
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/Region.java
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/RegionBroker.java
Modified:
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/advisory/AdvisoryBroker.java
URL:
http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/advisory/AdvisoryBroker.java?rev=612459&r1=612458&r2=612459&view=diff
==============================================================================
---
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/advisory/AdvisoryBroker.java
(original)
+++
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/advisory/AdvisoryBroker.java
Wed Jan 16 05:56:24 2008
@@ -17,6 +17,7 @@
package org.apache.activemq.advisory;
import java.util.Iterator;
+import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import org.apache.activemq.broker.Broker;
@@ -83,7 +84,7 @@
if (!AdvisorySupport.isAdvisoryTopic(info.getDestination())) {
ActiveMQTopic topic =
AdvisorySupport.getConsumerAdvisoryTopic(info.getDestination());
consumers.put(info.getConsumerId(), info);
- fireConsumerAdvisory(context, topic, info);
+ fireConsumerAdvisory(context,info.getDestination(), topic, info);
} else {
// We need to replay all the previously collected state objects
@@ -114,7 +115,7 @@
for (Iterator<ProducerInfo> iter =
producers.values().iterator(); iter.hasNext();) {
ProducerInfo value = iter.next();
ActiveMQTopic topic =
AdvisorySupport.getProducerAdvisoryTopic(value.getDestination());
- fireProducerAdvisory(context, topic, value,
info.getConsumerId());
+ fireProducerAdvisory(context,
value.getDestination(),topic, value, info.getConsumerId());
}
}
@@ -123,7 +124,7 @@
for (Iterator<ConsumerInfo> iter =
consumers.values().iterator(); iter.hasNext();) {
ConsumerInfo value = iter.next();
ActiveMQTopic topic =
AdvisorySupport.getConsumerAdvisoryTopic(value.getDestination());
- fireConsumerAdvisory(context, topic, value,
info.getConsumerId());
+ fireConsumerAdvisory(context,value.getDestination(),
topic, value, info.getConsumerId());
}
}
}
@@ -219,7 +220,7 @@
if (!AdvisorySupport.isAdvisoryTopic(info.getDestination())) {
ActiveMQTopic topic =
AdvisorySupport.getConsumerAdvisoryTopic(info.getDestination());
consumers.remove(info.getConsumerId());
- fireConsumerAdvisory(context, topic, info.createRemoveCommand());
+ fireConsumerAdvisory(context,info.getDestination(), topic,
info.createRemoveCommand());
}
}
@@ -230,7 +231,7 @@
if (info.getDestination() != null &&
!AdvisorySupport.isAdvisoryTopic(info.getDestination())) {
ActiveMQTopic topic =
AdvisorySupport.getProducerAdvisoryTopic(info.getDestination());
producers.remove(info.getProducerId());
- fireProducerAdvisory(context, topic, info.createRemoveCommand());
+ fireProducerAdvisory(context, info.getDestination(),topic,
info.createRemoveCommand());
}
}
@@ -253,21 +254,28 @@
fireAdvisory(context, topic, command, targetConsumerId,
advisoryMessage);
}
- protected void fireConsumerAdvisory(ConnectionContext context,
ActiveMQTopic topic, Command command) throws Exception {
- fireConsumerAdvisory(context, topic, command, null);
+ protected void fireConsumerAdvisory(ConnectionContext context,
ActiveMQDestination consumerDestination,ActiveMQTopic topic, Command command)
throws Exception {
+ fireConsumerAdvisory(context, consumerDestination,topic, command,
null);
}
- protected void fireConsumerAdvisory(ConnectionContext context,
ActiveMQTopic topic, Command command, ConsumerId targetConsumerId) throws
Exception {
+ protected void fireConsumerAdvisory(ConnectionContext context,
ActiveMQDestination consumerDestination,ActiveMQTopic topic, Command command,
ConsumerId targetConsumerId) throws Exception {
ActiveMQMessage advisoryMessage = new ActiveMQMessage();
- advisoryMessage.setIntProperty("consumerCount", consumers.size());
+ int count = 0;
+ Set<Destination>set = getDestinations(consumerDestination);
+ if (set != null) {
+ for (Destination dest:set) {
+ count +=
dest.getDestinationStatistics().getConsumers().getCount();
+ }
+ }
+ advisoryMessage.setIntProperty("consumerCount", count);
fireAdvisory(context, topic, command, targetConsumerId,
advisoryMessage);
}
- protected void fireProducerAdvisory(ConnectionContext context,
ActiveMQTopic topic, Command command) throws Exception {
- fireProducerAdvisory(context, topic, command, null);
+ protected void fireProducerAdvisory(ConnectionContext
context,ActiveMQDestination producerDestination, ActiveMQTopic topic, Command
command) throws Exception {
+ fireProducerAdvisory(context,producerDestination, topic, command,
null);
}
- protected void fireProducerAdvisory(ConnectionContext context,
ActiveMQTopic topic, Command command, ConsumerId targetConsumerId) throws
Exception {
+ protected void fireProducerAdvisory(ConnectionContext context,
ActiveMQDestination producerDestination,ActiveMQTopic topic, Command command,
ConsumerId targetConsumerId) throws Exception {
ActiveMQMessage advisoryMessage = new ActiveMQMessage();
advisoryMessage.setIntProperty("producerCount", producers.size());
fireAdvisory(context, topic, command, targetConsumerId,
advisoryMessage);
Modified:
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/BrokerFilter.java
URL:
http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/BrokerFilter.java?rev=612459&r1=612458&r2=612459&view=diff
==============================================================================
---
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/BrokerFilter.java
(original)
+++
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/BrokerFilter.java
Wed Jan 16 05:56:24 2008
@@ -65,7 +65,7 @@
return next.getDestinationMap();
}
- public Set getDestinations(ActiveMQDestination destination) {
+ public Set <Destination>getDestinations(ActiveMQDestination destination) {
return next.getDestinations(destination);
}
Modified:
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/BrokerService.java
URL:
http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/BrokerService.java?rev=612459&r1=612458&r2=612459&view=diff
==============================================================================
---
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/BrokerService.java
(original)
+++
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/BrokerService.java
Wed Jan 16 05:56:24 2008
@@ -725,16 +725,20 @@
if (persistenceAdapter == null) {
persistenceAdapter = createPersistenceAdapter();
configureService(persistenceAdapter);
+ this.persistenceAdapter =
registerPersistenceAdapterMBean(persistenceAdapter);
}
return persistenceAdapter;
}
/**
* Sets the persistence adaptor implementation to use for this broker
+ * @throws IOException
*/
- public void setPersistenceAdapter(PersistenceAdapter persistenceAdapter) {
+ public void setPersistenceAdapter(PersistenceAdapter persistenceAdapter)
throws IOException {
this.persistenceAdapter = persistenceAdapter;
configureService(this.persistenceAdapter);
+ this.persistenceAdapter =
registerPersistenceAdapterMBean(persistenceAdapter);
+
}
public TaskRunnerFactory getTaskRunnerFactory() {
@@ -1311,6 +1315,24 @@
throw IOExceptionSupport.create("Transport Connector could
not be registered in JMX: " + e.getMessage(), e);
}
}
+ }
+ }
+
+ protected PersistenceAdapter
registerPersistenceAdapterMBean(PersistenceAdapter adaptor) throws IOException {
+ MBeanServer mbeanServer = getManagementContext().getMBeanServer();
+ if (mbeanServer != null) {
+
+
+ }
+ return adaptor;
+ }
+
+ protected void unregisterPersistenceAdapterMBean(PersistenceAdapter
adaptor) throws IOException {
+ if (isUseJmx()) {
+ MBeanServer mbeanServer = getManagementContext().getMBeanServer();
+ if (mbeanServer != null) {
+
+ }
}
}
Modified:
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/Region.java
URL:
http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/Region.java?rev=612459&r1=612458&r2=612459&view=diff
==============================================================================
---
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/Region.java
(original)
+++
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/Region.java
Wed Jan 16 05:56:24 2008
@@ -131,6 +131,6 @@
*
* @return a set of matching destination objects.
*/
- Set getDestinations(ActiveMQDestination destination);
+ Set <Destination>getDestinations(ActiveMQDestination destination);
}
Modified:
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/RegionBroker.java
URL:
http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/RegionBroker.java?rev=612459&r1=612458&r2=612459&view=diff
==============================================================================
---
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/RegionBroker.java
(original)
+++
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/RegionBroker.java
Wed Jan 16 05:56:24 2008
@@ -120,7 +120,7 @@
return answer;
}
- public Set getDestinations(ActiveMQDestination destination) {
+ public Set <Destination> getDestinations(ActiveMQDestination destination) {
switch (destination.getDestinationType()) {
case ActiveMQDestination.QUEUE_TYPE:
return queueRegion.getDestinations(destination);