Author: jstrachan
Date: Tue Aug 19 05:56:40 2008
New Revision: 687043
URL: http://svn.apache.org/viewvc?rev=687043&view=rev
Log:
added some helper mbean methods so that you can ask for the consumers on a
DestinationViewMBean
Modified:
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/jmx/DestinationView.java
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/jmx/DestinationViewMBean.java
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/jmx/ManagedRegionBroker.java
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/jmx/TopicSubscriptionView.java
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/Destination.java
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/DestinationFilter.java
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/Queue.java
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/Subscription.java
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/Topic.java
activemq/trunk/activemq-core/src/test/java/org/apache/activemq/broker/region/SubscriptionAddRemoveQueueTest.java
Modified:
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/jmx/DestinationView.java
URL:
http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/jmx/DestinationView.java?rev=687043&r1=687042&r2=687043&view=diff
==============================================================================
---
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/jmx/DestinationView.java
(original)
+++
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/jmx/DestinationView.java
Tue Aug 19 05:56:40 2008
@@ -21,6 +21,7 @@
import java.util.Iterator;
import java.util.List;
import java.util.Map;
+import java.io.IOException;
import javax.jms.Connection;
import javax.jms.InvalidSelectorException;
@@ -33,11 +34,14 @@
import javax.management.openmbean.TabularData;
import javax.management.openmbean.TabularDataSupport;
import javax.management.openmbean.TabularType;
+import javax.management.ObjectName;
+import javax.management.MalformedObjectNameException;
import org.apache.activemq.ActiveMQConnectionFactory;
import org.apache.activemq.broker.jmx.OpenTypeSupport.OpenTypeFactory;
import org.apache.activemq.broker.region.Destination;
import org.apache.activemq.broker.region.Queue;
+import org.apache.activemq.broker.region.Subscription;
import org.apache.activemq.command.ActiveMQDestination;
import org.apache.activemq.command.ActiveMQMessage;
import org.apache.activemq.command.ActiveMQTextMessage;
@@ -341,5 +345,18 @@
public void setUseCache(boolean value) {
destination.setUseCache(value);
- }
+ }
+
+ public ObjectName[] getSubscriptions() throws IOException,
MalformedObjectNameException {
+ List<Subscription> subscriptions = destination.getConsumers();
+ ObjectName[] answer = new ObjectName[subscriptions.size()];
+ ObjectName objectName =
broker.getBrokerService().getBrokerObjectName();
+ int index = 0;
+ for (Subscription subscription : subscriptions) {
+ String connectionClientId =
subscription.getContext().getClientId();
+ String objectNameStr =
ManagedRegionBroker.getSubscriptionObjectName(subscription, connectionClientId,
objectName);
+ answer[index++] = new ObjectName(objectNameStr);
+ }
+ return answer;
+ }
}
Modified:
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/jmx/DestinationViewMBean.java
URL:
http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/jmx/DestinationViewMBean.java?rev=687043&r1=687042&r2=687043&view=diff
==============================================================================
---
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/jmx/DestinationViewMBean.java
(original)
+++
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/jmx/DestinationViewMBean.java
Tue Aug 19 05:56:40 2008
@@ -18,11 +18,14 @@
import java.util.List;
import java.util.Map;
+import java.io.IOException;
import javax.jms.InvalidSelectorException;
import javax.management.openmbean.CompositeData;
import javax.management.openmbean.OpenDataException;
import javax.management.openmbean.TabularData;
+import javax.management.ObjectName;
+import javax.management.MalformedObjectNameException;
public interface DestinationViewMBean {
@@ -259,4 +262,11 @@
*/
public void setUseCache(boolean value);
+ /**
+ * Returns all the current subscription MBeans matching this destination
+ *
+ * @return the names of the subscriptions for this destination
+ */
+ ObjectName[] getSubscriptions() throws IOException,
MalformedObjectNameException;
+
}
Modified:
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/jmx/ManagedRegionBroker.java
URL:
http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/jmx/ManagedRegionBroker.java?rev=687043&r1=687042&r2=687043&view=diff
==============================================================================
---
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/jmx/ManagedRegionBroker.java
(original)
+++
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/jmx/ManagedRegionBroker.java
Tue Aug 19 05:56:40 2008
@@ -168,27 +168,11 @@
}
public ObjectName registerSubscription(ConnectionContext context,
Subscription sub) {
- Hashtable map = brokerObjectName.getKeyPropertyList();
- String objectNameStr = brokerObjectName.getDomain() + ":" +
"BrokerName=" + map.get("BrokerName") + ",Type=Subscription,";
- String destinationType = "destinationType=" +
sub.getConsumerInfo().getDestination().getDestinationTypeAsString();
- String destinationName = "destinationName=" +
JMXSupport.encodeObjectNamePart(sub.getConsumerInfo().getDestination().getPhysicalName());
- String clientId = "clientId=" +
JMXSupport.encodeObjectNamePart(context.getClientId());
- String persistentMode = "persistentMode=";
- String consumerId = "";
+ String connectionClientId = context.getClientId();
+ ObjectName brokerJmxObjectName = brokerObjectName;
+ String objectNameStr = getSubscriptionObjectName(sub,
connectionClientId, brokerJmxObjectName);
+
SubscriptionKey key = new SubscriptionKey(context.getClientId(),
sub.getConsumerInfo().getSubscriptionName());
- if (sub.getConsumerInfo().isDurable()) {
- persistentMode += "Durable, subscriptionID=" +
JMXSupport.encodeObjectNamePart(sub.getConsumerInfo().getSubscriptionName());
- } else {
- persistentMode += "Non-Durable";
- if (sub.getConsumerInfo() != null &&
sub.getConsumerInfo().getConsumerId() != null) {
- consumerId = ",consumerId=" +
JMXSupport.encodeObjectNamePart(sub.getConsumerInfo().getConsumerId().toString());
- }
- }
- objectNameStr += persistentMode + ",";
- objectNameStr += destinationType + ",";
- objectNameStr += destinationName + ",";
- objectNameStr += clientId;
- objectNameStr += consumerId;
try {
ObjectName objectName = new ObjectName(objectNameStr);
SubscriptionView view;
@@ -210,6 +194,31 @@
}
}
+ public static String getSubscriptionObjectName(Subscription sub, String
connectionClientId, ObjectName brokerJmxObjectName) {
+ Hashtable map = brokerJmxObjectName.getKeyPropertyList();
+ String brokerDomain = brokerJmxObjectName.getDomain();
+ String objectNameStr = brokerDomain + ":" + "BrokerName=" +
map.get("BrokerName") + ",Type=Subscription,";
+ String destinationType = "destinationType=" +
sub.getConsumerInfo().getDestination().getDestinationTypeAsString();
+ String destinationName = "destinationName=" +
JMXSupport.encodeObjectNamePart(sub.getConsumerInfo().getDestination().getPhysicalName());
+ String clientId = "clientId=" +
JMXSupport.encodeObjectNamePart(connectionClientId);
+ String persistentMode = "persistentMode=";
+ String consumerId = "";
+ if (sub.getConsumerInfo().isDurable()) {
+ persistentMode += "Durable, subscriptionID=" +
JMXSupport.encodeObjectNamePart(sub.getConsumerInfo().getSubscriptionName());
+ } else {
+ persistentMode += "Non-Durable";
+ if (sub.getConsumerInfo() != null &&
sub.getConsumerInfo().getConsumerId() != null) {
+ consumerId = ",consumerId=" +
JMXSupport.encodeObjectNamePart(sub.getConsumerInfo().getConsumerId().toString());
+ }
+ }
+ objectNameStr += persistentMode + ",";
+ objectNameStr += destinationType + ",";
+ objectNameStr += destinationName + ",";
+ objectNameStr += clientId;
+ objectNameStr += consumerId;
+ return objectNameStr;
+ }
+
public void unregisterSubscription(Subscription sub) {
ObjectName name = subscriptionMap.remove(sub);
if (name != null) {
Modified:
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/jmx/TopicSubscriptionView.java
URL:
http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/jmx/TopicSubscriptionView.java?rev=687043&r1=687042&r2=687043&view=diff
==============================================================================
---
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/jmx/TopicSubscriptionView.java
(original)
+++
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/jmx/TopicSubscriptionView.java
Tue Aug 19 05:56:40 2008
@@ -57,4 +57,6 @@
topicSubscription.setMaximumPendingMessages(max);
}
}
+
+
}
Modified:
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/Destination.java
URL:
http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/Destination.java?rev=687043&r1=687042&r2=687043&view=diff
==============================================================================
---
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/Destination.java
(original)
+++
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/Destination.java
Tue Aug 19 05:56:40 2008
@@ -17,6 +17,7 @@
package org.apache.activemq.broker.region;
import java.io.IOException;
+import java.util.List;
import org.apache.activemq.Service;
import org.apache.activemq.broker.ConnectionContext;
@@ -168,4 +169,5 @@
*/
void isFull(ConnectionContext context,Usage usage);
+ List<Subscription> getConsumers();
}
Modified:
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/DestinationFilter.java
URL:
http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/DestinationFilter.java?rev=687043&r1=687042&r2=687043&view=diff
==============================================================================
---
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/DestinationFilter.java
(original)
+++
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/DestinationFilter.java
Tue Aug 19 05:56:40 2008
@@ -19,6 +19,8 @@
import java.io.IOException;
import java.util.Iterator;
import java.util.Set;
+import java.util.List;
+
import org.apache.activemq.broker.Broker;
import org.apache.activemq.broker.ConnectionContext;
import org.apache.activemq.broker.ProducerBrokerExchange;
@@ -100,6 +102,10 @@
next.stop();
}
+ public List<Subscription> getConsumers() {
+ return next.getConsumers();
+ }
+
/**
* Sends a message to the given destination which may be a wildcard
*/
Modified:
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/Queue.java
URL:
http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/Queue.java?rev=687043&r1=687042&r2=687043&view=diff
==============================================================================
---
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/Queue.java
(original)
+++
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/Queue.java
Tue Aug 19 05:56:40 2008
@@ -115,7 +115,13 @@
this.taskFactory=taskFactory;
this.dispatchSelector=new QueueDispatchSelector(destination);
}
-
+
+ public List<Subscription> getConsumers() {
+ synchronized (consumers) {
+ return new ArrayList<Subscription>(consumers);
+ }
+ }
+
public void initialize() throws Exception {
if (this.messages == null) {
if (destination.isTemporary() || broker == null || store == null) {
Modified:
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/Subscription.java
URL:
http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/Subscription.java?rev=687043&r1=687042&r2=687043&view=diff
==============================================================================
---
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/Subscription.java
(original)
+++
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/Subscription.java
Tue Aug 19 05:56:40 2008
@@ -221,4 +221,6 @@
* @return the number of messages this subscription can accept before its
full
*/
int countBeforeFull();
+
+ ConnectionContext getContext();
}
Modified:
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/Topic.java
URL:
http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/Topic.java?rev=687043&r1=687042&r2=687043&view=diff
==============================================================================
---
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/Topic.java
(original)
+++
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/Topic.java
Tue Aug 19 05:56:40 2008
@@ -19,6 +19,8 @@
import java.io.IOException;
import java.util.LinkedList;
import java.util.Set;
+import java.util.ArrayList;
+import java.util.List;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.CopyOnWriteArrayList;
import java.util.concurrent.CopyOnWriteArraySet;
@@ -106,6 +108,12 @@
}
}
+ public List<Subscription> getConsumers() {
+ synchronized (consumers) {
+ return new ArrayList<Subscription>(consumers);
+ }
+ }
+
public boolean lock(MessageReference node, LockOwner sub) {
return true;
}
Modified:
activemq/trunk/activemq-core/src/test/java/org/apache/activemq/broker/region/SubscriptionAddRemoveQueueTest.java
URL:
http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/broker/region/SubscriptionAddRemoveQueueTest.java?rev=687043&r1=687042&r2=687043&view=diff
==============================================================================
---
activemq/trunk/activemq-core/src/test/java/org/apache/activemq/broker/region/SubscriptionAddRemoveQueueTest.java
(original)
+++
activemq/trunk/activemq-core/src/test/java/org/apache/activemq/broker/region/SubscriptionAddRemoveQueueTest.java
Tue Aug 19 05:56:40 2008
@@ -140,6 +140,11 @@
dispatched.add(qmr);
}
+ public ConnectionContext getContext() {
+ // TODO
+ return null;
+ }
+
public void add(ConnectionContext context, Destination destination)
throws Exception {
// TODO Auto-generated method stub