Author: cwiklik Date: Wed Jan 13 18:46:20 2016 New Revision: 1724479 URL: http://svn.apache.org/viewvc?rev=1724479&view=rev Log: UIMA-4730 Modified to support new jmx query syntax in AMQ v 5.8.0 and beyond. It still supports older brokers as well
Modified: uima/sandbox/uima-ducc/trunk/uima-ducc-cli/src/main/java/org/apache/uima/ducc/cli/UimaAsServiceMonitor.java Modified: uima/sandbox/uima-ducc/trunk/uima-ducc-cli/src/main/java/org/apache/uima/ducc/cli/UimaAsServiceMonitor.java URL: http://svn.apache.org/viewvc/uima/sandbox/uima-ducc/trunk/uima-ducc-cli/src/main/java/org/apache/uima/ducc/cli/UimaAsServiceMonitor.java?rev=1724479&r1=1724478&r2=1724479&view=diff ============================================================================== --- uima/sandbox/uima-ducc/trunk/uima-ducc-cli/src/main/java/org/apache/uima/ducc/cli/UimaAsServiceMonitor.java (original) +++ uima/sandbox/uima-ducc/trunk/uima-ducc-cli/src/main/java/org/apache/uima/ducc/cli/UimaAsServiceMonitor.java Wed Jan 13 18:46:20 2016 @@ -20,6 +20,8 @@ package org.apache.uima.ducc.cli; import java.io.IOException; import java.text.DecimalFormat; +import java.util.Map; +import java.util.Map.Entry; import javax.management.MBeanServerConnection; import javax.management.MBeanServerInvocationHandler; @@ -67,13 +69,14 @@ public class UimaAsServiceMonitor String jmxFailure = null; - public UimaAsServiceMonitor(String qname, String broker_host, int broker_port) + String broker_host; + volatile boolean useNewJmxQueryString = false; + public UimaAsServiceMonitor(String qname, String broker_host, int broker_jmx_port) { - this.qname = qname; - this.broker_url = "service:jmx:rmi:///jndi/rmi://"+ broker_host + ":" + broker_port + "/jmxrmi"; + this.broker_host = broker_host; + this.qname = qname; + this.broker_url = "service:jmx:rmi:///jndi/rmi://"+ broker_host + ":" + broker_jmx_port + "/jmxrmi"; this.qstats = new ServiceStatistics(false, false, "N/A"); - // System.out.println("Broker url: " + broker_url); - } // public ServiceStatistics getStatistics() @@ -91,47 +94,76 @@ public class UimaAsServiceMonitor // return qstats; // } + private ObjectName getBrokerObjectName(MBeanServerConnection conn, String brokerQuery, String brokerNameProperty ) throws Exception { + ObjectName brokerObjectName = null; + for (Object nameObject : conn.queryNames(new ObjectName(brokerQuery), (QueryExp) null)) { + //find the brokername object + brokerObjectName = (ObjectName) nameObject; + + // the following code checks if the current object represents AMQ Broker MBean which looks like: + // org.apache.activemq:type=Broker,brokerName=localhost + // The above has just 2 properties: type and brokerName. We are after the brokerName here. Only + // one MBean has just these two properties. + Map<String,String> parts = brokerObjectName.getKeyPropertyList(); + boolean done = false; + if ( parts.size() == 2) { // looking for two properties Object only + for( Entry<String,String> entry : parts.entrySet()) { +// if ( entry.getKey().equals("brokerName")) { // need this property + if ( entry.getKey().equals(brokerNameProperty)) { // need this property + done = true; // found a match. Got a broker ObjectName + break; + } + } + } + if ( done ) { + break; + } +// if (brokerObjectName.getCanonicalName().endsWith("Type=Broker")) { + // Extract just the name from the canonical name + //String brokerName = brokerObjectName.getCanonicalName().substring(0, brokerObjectName.getCanonicalName().indexOf(",")); + //System.out.println("Canonical name of broker is " + brokerObjectName.getCanonicalName()); + //System.out.println("broker name is " + brokerName); + } + return brokerObjectName; + } + /** * Connect to ActiveMq and find the mbean for the queue we're trying to monitor */ public void init(String parm /* parm not used in this impl */) throws Exception { - // String methodName = "init"; - JMXServiceURL url = new JMXServiceURL(broker_url); jmxc = JMXConnectorFactory.connect(url); MBeanServerConnection conn = jmxc.getMBeanServerConnection(); String jmxDomain = "org.apache.activemq"; - + String brokerJmxDomain = null; + // - // First get the broker name + // First get the broker name. Assume we are connecting to AMQ broker version >= 5.8.0 // - ObjectName brokerObjectName = null; - for (Object nameObject : conn.queryNames(new ObjectName(jmxDomain + ":*,Type=Broker"), (QueryExp) null)) { - //find the brokername object - brokerObjectName = (ObjectName) nameObject; - //if (brokerObjectName.getCanonicalName().endsWith("Type=Broker")) { - // Extract just the name from the canonical name - //String brokerName = brokerObjectName.getCanonicalName().substring(0, brokerObjectName.getCanonicalName().indexOf(",")); - //System.out.println("Canonical name of broker is " + brokerObjectName.getCanonicalName()); - //System.out.println("broker name is " + brokerName); - //} - } - - //ObjectName activeMQ = new ObjectName("org.apache.activemq:BrokerName=" + broker_name +",Type=Broker"); - - brokerMBean = (BrokerViewMBean) MBeanServerInvocationHandler.newProxyInstance(conn, brokerObjectName ,BrokerViewMBean.class, true); - for (ObjectName name : brokerMBean.getQueues()) { - QueueViewMBean qView = (QueueViewMBean) - MBeanServerInvocationHandler.newProxyInstance(conn, name, QueueViewMBean.class, true); - - if ( qname.equals(qView.getName()) ) { - monitoredQueue = qView; - } else { - // System.out.println("Skipping queue " + qView.getName()); - } + String brokerNameProperty = "brokerName"; + String destinationId = ",destinationType=Queue,destinationName="; + ObjectName brokerObjectName = null; + String brokerQuery = jmxDomain + ":type=Broker,*"; + // Get ObjectName for the Broker. The getBrokerObjectName() returns null if + // version of broker < 5.8.0 + brokerObjectName = getBrokerObjectName(conn, brokerQuery, brokerNameProperty); + if ( brokerObjectName == null ) { // broker version 5.7 + // this is a fall back mechanism in case we are connected to an older broker ( < 5.8.0). + // In older versions of AMQ query syntax is different. + brokerQuery = jmxDomain + ":Type=Broker,*"; + brokerNameProperty = "BrokerName"; + destinationId = ",Type=Queue,Destination="; + brokerObjectName = getBrokerObjectName(conn, brokerQuery, brokerNameProperty); + String bcn = brokerObjectName.getCanonicalName(); + brokerJmxDomain = bcn.substring(0,bcn.indexOf(",")); + } else { + brokerJmxDomain = brokerObjectName.getCanonicalName(); } + ObjectName targetQueueON = new ObjectName(brokerJmxDomain+destinationId+qname); + QueueViewMBean brokerQMBean = (QueueViewMBean) MBeanServerInvocationHandler.newProxyInstance(conn, targetQueueON ,QueueViewMBean.class, true); + monitoredQueue = brokerQMBean; if ( monitoredQueue == null ) { throw new IllegalStateException("Cannot find queue: " + qname);