Author: rajdavies
Date: Wed Jan 16 11:03:02 2008
New Revision: 612542
URL: http://svn.apache.org/viewvc?rev=612542&view=rev
Log:
Add producers to DestinationStatistics
Modified:
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/advisory/AdvisoryBroker.java
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/AbstractRegion.java
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/BaseDestination.java
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/Destination.java
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/DestinationFilter.java
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/DestinationStatistics.java
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/Queue.java
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/Region.java
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/RegionBroker.java
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/Topic.java
Modified:
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/advisory/AdvisoryBroker.java
URL:
http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/advisory/AdvisoryBroker.java?rev=612542&r1=612541&r2=612542&view=diff
==============================================================================
---
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/advisory/AdvisoryBroker.java
(original)
+++
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/advisory/AdvisoryBroker.java
Wed Jan 16 11:03:02 2008
@@ -277,7 +277,17 @@
protected void fireProducerAdvisory(ConnectionContext context,
ActiveMQDestination producerDestination,ActiveMQTopic topic, Command command,
ConsumerId targetConsumerId) throws Exception {
ActiveMQMessage advisoryMessage = new ActiveMQMessage();
- advisoryMessage.setIntProperty("producerCount", producers.size());
+ int count = 0;
+ if (producerDestination != null) {
+ Set<Destination> set = getDestinations(producerDestination);
+ if (set != null) {
+ for (Destination dest : set) {
+ count += dest.getDestinationStatistics().getConsumers()
+ .getCount();
+ }
+ }
+ }
+ advisoryMessage.setIntProperty("producerCount", count);
fireAdvisory(context, topic, command, targetConsumerId,
advisoryMessage);
}
Modified:
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/AbstractRegion.java
URL:
http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/AbstractRegion.java?rev=612542&r1=612541&r2=612542&view=diff
==============================================================================
---
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/AbstractRegion.java
(original)
+++
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/AbstractRegion.java
Wed Jan 16 11:03:02 2008
@@ -37,6 +37,7 @@
import org.apache.activemq.command.MessageAck;
import org.apache.activemq.command.MessageDispatchNotification;
import org.apache.activemq.command.MessagePull;
+import org.apache.activemq.command.ProducerInfo;
import org.apache.activemq.command.RemoveSubscriptionInfo;
import org.apache.activemq.command.Response;
import org.apache.activemq.filter.DestinationFilter;
@@ -408,5 +409,26 @@
public void setAutoCreateDestinations(boolean autoCreateDestinations) {
this.autoCreateDestinations = autoCreateDestinations;
}
+
+ public void addProducer(ConnectionContext context, ProducerInfo info)
throws Exception{
+ for (Iterator iter =
destinationMap.get(info.getDestination()).iterator(); iter.hasNext();) {
+ Destination dest = (Destination)iter.next();
+ dest.addProducer(context, info);
+ }
+ }
+
+ /**
+ * Removes a Producer.
+ * @param context the environment the operation is being executed under.
+ * @throws Exception TODO
+ */
+ public void removeProducer(ConnectionContext context, ProducerInfo info)
throws Exception{
+ for (Iterator iter =
destinationMap.get(info.getDestination()).iterator(); iter.hasNext();) {
+ Destination dest = (Destination)iter.next();
+ dest.removeProducer(context, info);
+ }
+ }
+
+
}
Modified:
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/BaseDestination.java
URL:
http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/BaseDestination.java?rev=612542&r1=612541&r2=612542&view=diff
==============================================================================
---
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/BaseDestination.java
(original)
+++
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/BaseDestination.java
Wed Jan 16 11:03:02 2008
@@ -16,6 +16,9 @@
*/
package org.apache.activemq.broker.region;
+import org.apache.activemq.broker.ConnectionContext;
+import org.apache.activemq.command.ProducerInfo;
+
/**
* @version $Revision: 1.12 $
@@ -26,6 +29,7 @@
private int maxProducersToAudit=1024;
private int maxAuditDepth=1;
private boolean enableAudit=true;
+ protected final DestinationStatistics destinationStatistics = new
DestinationStatistics();
/**
* @return the producerFlowControl
*/
@@ -73,6 +77,14 @@
*/
public void setEnableAudit(boolean enableAudit) {
this.enableAudit = enableAudit;
+ }
+
+ public void addProducer(ConnectionContext context, ProducerInfo info)
throws Exception{
+ destinationStatistics.getProducers().increment();
+ }
+
+ public void removeProducer(ConnectionContext context, ProducerInfo info)
throws Exception{
+ destinationStatistics.getProducers().decrement();
}
Modified:
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/Destination.java
URL:
http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/Destination.java?rev=612542&r1=612541&r2=612542&view=diff
==============================================================================
---
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/Destination.java
(original)
+++
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/Destination.java
Wed Jan 16 11:03:02 2008
@@ -25,6 +25,7 @@
import org.apache.activemq.command.ActiveMQDestination;
import org.apache.activemq.command.Message;
import org.apache.activemq.command.MessageAck;
+import org.apache.activemq.command.ProducerInfo;
import org.apache.activemq.store.MessageStore;
import org.apache.activemq.usage.MemoryUsage;
import org.apache.activemq.usage.SystemUsage;
@@ -37,6 +38,10 @@
void addSubscription(ConnectionContext context, Subscription sub) throws
Exception;
void removeSubscription(ConnectionContext context, Subscription sub)
throws Exception;
+
+ void addProducer(ConnectionContext context, ProducerInfo info) throws
Exception;
+
+ void removeProducer(ConnectionContext context, ProducerInfo info) throws
Exception;
void send(ProducerBrokerExchange producerExchange, Message messageSend)
throws Exception;
Modified:
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/DestinationFilter.java
URL:
http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/DestinationFilter.java?rev=612542&r1=612541&r2=612542&view=diff
==============================================================================
---
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/DestinationFilter.java
(original)
+++
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/DestinationFilter.java
Wed Jan 16 11:03:02 2008
@@ -27,6 +27,7 @@
import org.apache.activemq.command.ActiveMQDestination;
import org.apache.activemq.command.Message;
import org.apache.activemq.command.MessageAck;
+import org.apache.activemq.command.ProducerInfo;
import org.apache.activemq.store.MessageStore;
import org.apache.activemq.usage.MemoryUsage;
import org.apache.activemq.usage.SystemUsage;
@@ -128,6 +129,17 @@
public void setProducerFlowControl(boolean value){
next.setProducerFlowControl(value);
+ }
+
+ public void addProducer(ConnectionContext context, ProducerInfo info)
+ throws Exception {
+ next.addProducer(context, info);
+
+ }
+
+ public void removeProducer(ConnectionContext context, ProducerInfo info)
+ throws Exception {
+ next.removeProducer(context, info);
}
Modified:
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/DestinationStatistics.java
URL:
http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/DestinationStatistics.java?rev=612542&r1=612541&r2=612542&view=diff
==============================================================================
---
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/DestinationStatistics.java
(original)
+++
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/DestinationStatistics.java
Wed Jan 16 11:03:02 2008
@@ -32,6 +32,7 @@
protected CountStatisticImpl enqueues;
protected CountStatisticImpl dequeues;
protected CountStatisticImpl consumers;
+ protected CountStatisticImpl producers;
protected CountStatisticImpl messages;
protected PollCountStatisticImpl messagesCached;
protected CountStatisticImpl dispatched;
@@ -43,6 +44,7 @@
dispatched = new CountStatisticImpl("dispatched", "The number of
messages that have been dispatched from the destination");
dequeues = new CountStatisticImpl("dequeues", "The number of messages
that have been acknowledged from the destination");
consumers = new CountStatisticImpl("consumers", "The number of
consumers that that are subscribing to messages from the destination");
+ producers = new CountStatisticImpl("producers", "The number of
producers that that are publishing messages to the destination");
messages = new CountStatisticImpl("messages", "The number of messages
that that are being held by the destination");
messagesCached = new PollCountStatisticImpl("messagesCached", "The
number of messages that are held in the destination's memory cache");
processTime = new TimeStatisticImpl("processTime", "information around
length of time messages are held by a destination");
@@ -50,6 +52,7 @@
addStatistic("dispatched", dispatched);
addStatistic("dequeues", dequeues);
addStatistic("consumers", consumers);
+ addStatistic("prodcuers", producers);
addStatistic("messages", messages);
addStatistic("messagesCached", messagesCached);
addStatistic("processTime", processTime);
@@ -66,6 +69,10 @@
public CountStatisticImpl getConsumers() {
return consumers;
}
+
+ public CountStatisticImpl getProducers() {
+ return producers;
+ }
public PollCountStatisticImpl getMessagesCached() {
return messagesCached;
@@ -100,6 +107,7 @@
dispatched.setEnabled(enabled);
dequeues.setEnabled(enabled);
consumers.setEnabled(enabled);
+ producers.setEnabled(enabled);
messages.setEnabled(enabled);
messagesCached.setEnabled(enabled);
processTime.setEnabled(enabled);
@@ -112,6 +120,7 @@
dispatched.setParent(parent.dispatched);
dequeues.setParent(parent.dequeues);
consumers.setParent(parent.consumers);
+ producers.setParent(parent.producers);
messagesCached.setParent(parent.messagesCached);
messages.setParent(parent.messages);
processTime.setParent(parent.processTime);
@@ -120,6 +129,7 @@
dispatched.setParent(null);
dequeues.setParent(null);
consumers.setParent(null);
+ producers.setParent(null);
messagesCached.setParent(null);
messages.setParent(null);
processTime.setParent(null);
Modified:
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/Queue.java
URL:
http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/Queue.java?rev=612542&r1=612541&r2=612542&view=diff
==============================================================================
---
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/Queue.java
(original)
+++
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/Queue.java
Wed Jan 16 11:03:02 2008
@@ -79,7 +79,6 @@
private final List<Subscription> consumers = new
ArrayList<Subscription>(50);
private final SystemUsage systemUsage;
private final MemoryUsage memoryUsage;
- private final DestinationStatistics destinationStatistics = new
DestinationStatistics();
private PendingMessageCursor messages;
private final LinkedList<MessageReference> pagedInMessages = new
LinkedList<MessageReference>();
private LockOwner exclusiveOwner;
Modified:
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/Region.java
URL:
http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/Region.java?rev=612542&r1=612541&r2=612542&view=diff
==============================================================================
---
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/Region.java
(original)
+++
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/Region.java
Wed Jan 16 11:03:02 2008
@@ -29,6 +29,7 @@
import org.apache.activemq.command.MessageAck;
import org.apache.activemq.command.MessageDispatchNotification;
import org.apache.activemq.command.MessagePull;
+import org.apache.activemq.command.ProducerInfo;
import org.apache.activemq.command.RemoveSubscriptionInfo;
import org.apache.activemq.command.Response;
@@ -86,6 +87,21 @@
* @throws Exception TODO
*/
void removeConsumer(ConnectionContext context, ConsumerInfo info) throws
Exception;
+
+ /**
+ * Adds a Producer.
+ * @param context the environment the operation is being executed under.
+ * @throws Exception TODO
+ */
+ void addProducer(ConnectionContext context, ProducerInfo info) throws
Exception;
+
+ /**
+ * Removes a Producer.
+ * @param context the environment the operation is being executed under.
+ * @throws Exception TODO
+ */
+ void removeProducer(ConnectionContext context, ProducerInfo info) throws
Exception;
+
/**
* Deletes a durable subscription.
Modified:
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/RegionBroker.java
URL:
http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/RegionBroker.java?rev=612542&r1=612541&r2=612542&view=diff
==============================================================================
---
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/RegionBroker.java
(original)
+++
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/RegionBroker.java
Wed Jan 16 11:03:02 2008
@@ -322,10 +322,45 @@
public void removeSession(ConnectionContext context, SessionInfo info)
throws Exception {
}
- public void addProducer(ConnectionContext context, ProducerInfo info)
throws Exception {
+ public void addProducer(ConnectionContext context, ProducerInfo info)
+ throws Exception {
+ ActiveMQDestination destination = info.getDestination();
+ if (destination != null) {
+ switch (destination.getDestinationType()) {
+ case ActiveMQDestination.QUEUE_TYPE:
+ queueRegion.addProducer(context, info);
+ break;
+ case ActiveMQDestination.TOPIC_TYPE:
+ topicRegion.addProducer(context, info);
+ break;
+ case ActiveMQDestination.TEMP_QUEUE_TYPE:
+ tempQueueRegion.addProducer(context, info);
+ break;
+ case ActiveMQDestination.TEMP_TOPIC_TYPE:
+ tempTopicRegion.addProducer(context, info);
+ break;
+ }
+ }
}
public void removeProducer(ConnectionContext context, ProducerInfo info)
throws Exception {
+ ActiveMQDestination destination = info.getDestination();
+ if (destination != null) {
+ switch (destination.getDestinationType()) {
+ case ActiveMQDestination.QUEUE_TYPE:
+ queueRegion.removeProducer(context, info);
+ break;
+ case ActiveMQDestination.TOPIC_TYPE:
+ topicRegion.removeProducer(context, info);
+ break;
+ case ActiveMQDestination.TEMP_QUEUE_TYPE:
+ tempQueueRegion.removeProducer(context, info);
+ break;
+ case ActiveMQDestination.TEMP_TOPIC_TYPE:
+ tempTopicRegion.removeProducer(context, info);
+ break;
+ }
+ }
}
public Subscription addConsumer(ConnectionContext context, ConsumerInfo
info) throws Exception {
Modified:
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/Topic.java
URL:
http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/Topic.java?rev=612542&r1=612541&r2=612542&view=diff
==============================================================================
---
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/Topic.java
(original)
+++
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/Topic.java
Wed Jan 16 11:03:02 2008
@@ -76,8 +76,7 @@
protected final TopicMessageStore store;
private final SystemUsage systemUsage;
private final MemoryUsage memoryUsage;
- protected final DestinationStatistics destinationStatistics = new
DestinationStatistics();
-
+
private DispatchPolicy dispatchPolicy = new SimpleDispatchPolicy();
private SubscriptionRecoveryPolicy subscriptionRecoveryPolicy;
private boolean sendAdvisoryIfNoConsumers;