I setup a test system to better understand how networked brokers interact and
messages, etc. are seen via JMX. The receiving broker is receiving the
messages sent by the transmit broker but when I view it via JMX it shows
that the messages were not dequeued. I've ran a network test before and
viewed it as the messages were dequeued so I don't know what the difference
is this time. On the transmit side the JMX shows 0 for producers, queue
size, dispatch count, enqueue count, etc. Why does the side networking to
another broker show nothing for these but the server side broker does? Code
below
JMSMonitor.java // used to monitor via JMX
public class JMSMonitor
{
public static void main(final String[] args)
{
if(args.length != 3)
{
System.out.println("Invalid argument set:Three arguments required:
IP, broker domain name and broker name");
}
else
{
try
{
final String ip = args[0];
final String domainName = args[1];
final String brokerName = args[2];
final int rmiregport = 1099;
final String url = "service:jmx:rmi:///jndi/rmi://" + ip + ":" +
rmiregport + "/jmxrmi";
System.out.println("JMXServiceURL address=" + url);
final JMXServiceURL address = new JMXServiceURL(url);
final JMXConnector connector =
JMXConnectorFactory.connect(address,
null);
connector.connect();
final MBeanServerConnection connection =
connector.getMBeanServerConnection();
final ObjectName jmsManagementObjectName = new
ObjectName(domainName + ":BrokerName=" + brokerName + ",Type=Broker");
final BrokerViewMBean brokerMBean =
MBeanServerInvocationHandler.newProxyInstance(connection,
jmsManagementObjectName, BrokerViewMBean.class, true);
final Console console = System.console();
String userInput;
do
{
System.out.println("Number of queues=" +
brokerMBean.getQueues().length);
for(final ObjectName queueName : brokerMBean.getQueues())
{
final QueueViewMBean queueMbean =
MBeanServerInvocationHandler.newProxyInstance(connection, queueName,
QueueViewMBean.class, true);
if(queueMbean.getName().startsWith("Van") ||
queueMbean.getName().equals("START"))
{
System.out.println("Queue name=" + queueMbean.getName()
+
", consumer count=" +
queueMbean.getConsumerCount() +
", producer count=" +
queueMbean.getProducerCount() +
", queue size=" +
queueMbean.getQueueSize() +
", dispatch count=" +
queueMbean.getDispatchCount() +
", enqueue count=" +
queueMbean.getEnqueueCount() +
", dequeue count=" +
queueMbean.getDequeueCount());
}
}
userInput = console.readLine("Enter 'exit' to end or any key
to continue\n==>");
}
while(userInput.equalsIgnoreCase("exit") == false);
}
catch(final Exception e)
{
e.printStackTrace();
}
}
}
}
activemq.xml // Transmit broker
<beans
xmlns="http://www.springframework.org/schema/beans"
xmlns:amq="http://activemq.apache.org/schema/core"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://www.springframework.org/schema/beans
http://www.springframework.org/schema/beans/spring-beans-2.0.xsd
http://activemq.apache.org/schema/core
http://activemq.apache.org/schema/core/activemq-core.xsd">
<bean
class="org.springframework.beans.factory.config.PropertyPlaceholderConfigurer">
<property name="locations">
<value>file:${activemq.base}/conf/credentials.properties</value>
</property>
</bean>
<broker xmlns="http://activemq.apache.org/schema/core"
brokerName="networkTestTransmit"
dataDirectory="${activemq.base}/data"
destroyApplicationContextOnStop="true">
<destinationPolicy>
<policyMap>
<policyEntries>
<policyEntry topic=">" producerFlowControl="true"
memoryLimit="1mb">
<pendingSubscriberPolicy>
<vmCursor />
</pendingSubscriberPolicy>
</policyEntry>
<policyEntry queue=">" producerFlowControl="true"
memoryLimit="1mb">
</policyEntry>
</policyEntries>
</policyMap>
</destinationPolicy>
<managementContext>
<managementContext connectorPort="1099"
jmxDomainName="transmit" />
</managementContext>
<networkConnectors>
<networkConnector uri="static:(tcp://IP of receiving
broker:61616)"
duplex="true"/>
</networkConnectors>
<persistenceAdapter>
<kahaDB directory="${activemq.base}/data/kahadb"/>
</persistenceAdapter>
<transportConnectors>
<transportConnector name="openwire" uri="tcp://0.0.0.0:61616"/>
</transportConnectors>
</broker>
<import resource="jetty.xml"/>
</beans>
MsgSenderTest.java // Posts JMS messages
public class MsgSenderTest
{
private static ActiveMQConnectionFactory connectionFactory = new
ActiveMQConnectionFactory("tcp://IP of receiver computer:61616");
/**
* @param args
*/
public static void main(final String[] args_)
{
System.out.println("Connecting to ActiveMQ:" +
MsgSenderTest.connectionFactory.getBrokerURL());
if(args_.length < 2)
{
System.out.println("Test number not specified as first argument or
number of messages as second argument");
System.exit(0);
}
Connection connection = null;
Session startTopicSession = null;
MessageProducer startProducer = null;
try
{
final int numberOfMessages = Integer.parseInt(args_[1]);
connection = MsgSenderTest.connectionFactory.createConnection();
connection.start();
startTopicSession = connection.createSession(false,
Session.AUTO_ACKNOWLEDGE);
final Queue startQueue =
startTopicSession.createQueue("VanTestTransmit" + args_[0]);
startProducer = startTopicSession.createProducer(startQueue);
for(int i = 0; i < numberOfMessages; i++)
{
System.out.println("Sending message #" + (i + 1));
startProducer.send(startTopicSession.createMessage());
try
{
Thread.sleep(500);
}
catch(final Exception e2){}
}
final Message message = startTopicSession.createMessage();
message.setStringProperty("END", "");
startProducer.send(message);
}
catch(final JMSException e) {
e.printStackTrace(); }
finally
{
if(startProducer != null)
{
try
{
System.out.println("Closing Producer");
startProducer.close();
}
catch(final JMSException e) {
e.printStackTrace(); }
}
if(startTopicSession != null)
{
try
{
System.out.println("Closing Session");
startTopicSession.close();
}
catch(final JMSException e) {
e.printStackTrace(); }
}
if(connection != null)
{
try
{
System.out.println("Closing Connection");
connection.close();
}
catch(final JMSException e) {
e.printStackTrace(); }
}
}
}
}
activemq.xml // ActiveMQ config file for receiving broker
<beans
xmlns="http://www.springframework.org/schema/beans"
xmlns:amq="http://activemq.apache.org/schema/core"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://www.springframework.org/schema/beans
http://www.springframework.org/schema/beans/spring-beans-2.0.xsd
http://activemq.apache.org/schema/core
http://activemq.apache.org/schema/core/activemq-core.xsd"
<bean
class="org.springframework.beans.factory.config.PropertyPlaceholderConfigurer">
<property name="locations">
<value>file:${activemq.base}/conf/credentials.properties</value>
</property>
</bean>
<broker xmlns="http://activemq.apache.org/schema/core"
brokerName="networkTestReceive"
dataDirectory="${activemq.base}/data"
destroyApplicationContextOnStop="true">
<destinationPolicy>
<policyMap>
<policyEntries>
<policyEntry topic=">" producerFlowControl="true"
memoryLimit="1mb">
<pendingSubscriberPolicy>
<vmCursor />
</pendingSubscriberPolicy>
</policyEntry>
<policyEntry queue=">" producerFlowControl="true"
memoryLimit="1mb">
</policyEntry>
</policyEntries>
</policyMap>
</destinationPolicy>
<managementContext>
<managementContext connectorPort="1099"
jmxDomainName="receive" />
</managementContext>
<persistenceAdapter>
<kahaDB directory="${activemq.base}/data/kahadb"/>
</persistenceAdapter>
<transportConnectors>
<transportConnector name="openwire" uri="tcp://0.0.0.0:61616"/>
</transportConnectors>
</broker>
<import resource="jetty.xml"/>
</beans>
MsgListenerTest.java // Receives messages
public class MsgListenerTest implements MessageListener
{
private static ActiveMQConnectionFactory connectionFactory = new
ActiveMQConnectionFactory("tcp://localhost:61616");
private Connection connection;
private Session session;
private MessageConsumer consumer;
private int msgNumber = 1;
public static void main(final String[] args_)
{
if(args_.length < 1)
{
System.out.println("Queue id not defined in arg #1");
System.exit(0);
}
new MsgListenerTest("VanTestTransmit" + args_[0]);
}
public MsgListenerTest(final String queueName_)
{
try
{
this.connection =
MsgListenerTest.connectionFactory.createConnection();
this.connection.start();
this.session = this.connection.createSession(true,
Session.AUTO_ACKNOWLEDGE);
this.consumer =
this.session.createConsumer(this.session.createQueue(queueName_));
this.consumer.setMessageListener(this);
}
catch(final Exception e)
{
e.printStackTrace();
}
}
@Override
public void onMessage(final Message message_)
{
try
{
if(message_.getStringProperty("END") == null)
{
System.out.println("Received messaage #" + this.msgNumber);
this.msgNumber++;
}
else
{
if(this.consumer != null)
{
System.out.println("Closing Consumer");
try
{
this.consumer.close();
}
catch(final JMSException e) {
e.printStackTrace(); }
}
if(this.session != null)
{
System.out.println("Closing Session");
try
{
this.session.close();
}
catch(final JMSException e) {
e.printStackTrace(); }
}
if(this.connection != null)
{
System.out.println("Closing Connection");
try
{
this.connection.close();
}
catch(final JMSException e) {
e.printStackTrace(); }
}
}
}
catch(final JMSException e2) { e2.printStackTrace();
}
}
}
--
View this message in context:
http://activemq.2283324.n4.nabble.com/Understanding-Networking-and-JMX-tp3922968p3922968.html
Sent from the ActiveMQ - User mailing list archive at Nabble.com.