Repository: activemq Updated Branches: refs/heads/master 2b84cd60b -> 10c998b0b
https://issues.apache.org/jira/browse/AMQ-6129 Adding NetworkBridgeStatistics and also a received count for bridges when they are in duplex mode. Project: http://git-wip-us.apache.org/repos/asf/activemq/repo Commit: http://git-wip-us.apache.org/repos/asf/activemq/commit/10c998b0 Tree: http://git-wip-us.apache.org/repos/asf/activemq/tree/10c998b0 Diff: http://git-wip-us.apache.org/repos/asf/activemq/diff/10c998b0 Branch: refs/heads/master Commit: 10c998b0bc9728276a738ed24d20c6fc82c6365a Parents: 2b84cd6 Author: Christopher L. Shannon (cshannon) <christopher.l.shan...@gmail.com> Authored: Fri Jan 15 14:39:49 2016 +0000 Committer: Christopher L. Shannon (cshannon) <christopher.l.shan...@gmail.com> Committed: Fri Jan 15 14:54:04 2016 +0000 ---------------------------------------------------------------------- .../activemq/broker/jmx/NetworkBridgeView.java | 16 +++ .../broker/jmx/NetworkBridgeViewMBean.java | 2 + .../network/DemandForwardingBridgeSupport.java | 27 +++-- .../apache/activemq/network/NetworkBridge.java | 5 + .../network/NetworkBridgeStatistics.java | 102 +++++++++++++++++++ .../org/apache/activemq/bugs/AMQ4160Test.java | 8 ++ .../activemq/network/DuplexNetworkTest.java | 15 +++ .../activemq/network/SimpleNetworkTest.java | 26 ++++- 8 files changed, 190 insertions(+), 11 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/activemq/blob/10c998b0/activemq-broker/src/main/java/org/apache/activemq/broker/jmx/NetworkBridgeView.java ---------------------------------------------------------------------- diff --git a/activemq-broker/src/main/java/org/apache/activemq/broker/jmx/NetworkBridgeView.java b/activemq-broker/src/main/java/org/apache/activemq/broker/jmx/NetworkBridgeView.java index c557754..756ffbe 100644 --- a/activemq-broker/src/main/java/org/apache/activemq/broker/jmx/NetworkBridgeView.java +++ b/activemq-broker/src/main/java/org/apache/activemq/broker/jmx/NetworkBridgeView.java @@ -31,43 +31,58 @@ public class NetworkBridgeView implements NetworkBridgeViewMBean { this.bridge = bridge; } + @Override public void start() throws Exception { bridge.start(); } + @Override public void stop() throws Exception { bridge.stop(); } + @Override public String getLocalAddress() { return bridge.getLocalAddress(); } + @Override public String getRemoteAddress() { return bridge.getRemoteAddress(); } + @Override public String getRemoteBrokerName() { return bridge.getRemoteBrokerName(); } + @Override public String getRemoteBrokerId() { return bridge.getRemoteBrokerId(); } + @Override public String getLocalBrokerName() { return bridge.getLocalBrokerName(); } + @Override public long getEnqueueCounter() { return bridge.getEnqueueCounter(); } + @Override public long getDequeueCounter() { return bridge.getDequeueCounter(); } + @Override + public long getReceivedCounter() { + return bridge.getNetworkBridgeStatistics().getReceivedCount().getCount(); + } + + @Override public boolean isCreatedByDuplex() { return createByDuplex; } @@ -76,6 +91,7 @@ public class NetworkBridgeView implements NetworkBridgeViewMBean { this.createByDuplex = createByDuplex; } + @Override public void resetStats(){ bridge.resetStats(); for (NetworkDestinationView networkDestinationView:networkDestinationViewList){ http://git-wip-us.apache.org/repos/asf/activemq/blob/10c998b0/activemq-broker/src/main/java/org/apache/activemq/broker/jmx/NetworkBridgeViewMBean.java ---------------------------------------------------------------------- diff --git a/activemq-broker/src/main/java/org/apache/activemq/broker/jmx/NetworkBridgeViewMBean.java b/activemq-broker/src/main/java/org/apache/activemq/broker/jmx/NetworkBridgeViewMBean.java index 639a81a..82fc9ca 100644 --- a/activemq-broker/src/main/java/org/apache/activemq/broker/jmx/NetworkBridgeViewMBean.java +++ b/activemq-broker/src/main/java/org/apache/activemq/broker/jmx/NetworkBridgeViewMBean.java @@ -34,6 +34,8 @@ public interface NetworkBridgeViewMBean extends Service { long getDequeueCounter(); + long getReceivedCounter(); + boolean isCreatedByDuplex(); void resetStats(); http://git-wip-us.apache.org/repos/asf/activemq/blob/10c998b0/activemq-broker/src/main/java/org/apache/activemq/network/DemandForwardingBridgeSupport.java ---------------------------------------------------------------------- diff --git a/activemq-broker/src/main/java/org/apache/activemq/network/DemandForwardingBridgeSupport.java b/activemq-broker/src/main/java/org/apache/activemq/network/DemandForwardingBridgeSupport.java index 112f5ff..7f3eeb4 100644 --- a/activemq-broker/src/main/java/org/apache/activemq/network/DemandForwardingBridgeSupport.java +++ b/activemq-broker/src/main/java/org/apache/activemq/network/DemandForwardingBridgeSupport.java @@ -34,7 +34,6 @@ import java.util.concurrent.Future; import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeoutException; import java.util.concurrent.atomic.AtomicBoolean; -import java.util.concurrent.atomic.AtomicLong; import javax.management.ObjectName; @@ -139,8 +138,7 @@ public abstract class DemandForwardingBridgeSupport implements NetworkBridge, Br protected final BrokerId remoteBrokerPath[] = new BrokerId[]{null}; protected BrokerId remoteBrokerId; - final AtomicLong enqueueCounter = new AtomicLong(); - final AtomicLong dequeueCounter = new AtomicLong(); + protected final NetworkBridgeStatistics networkBridgeStatistics = new NetworkBridgeStatistics(); private NetworkBridgeListener networkBridgeListener; private boolean createdByDuplex; @@ -181,6 +179,8 @@ public abstract class DemandForwardingBridgeSupport implements NetworkBridge, Br throw new IllegalArgumentException("BrokerService is null on " + this); } + networkBridgeStatistics.setEnabled(brokerService.isEnableStatistics()); + if (isDuplex()) { duplexInboundLocalBroker = NetworkBridgeFactory.createLocalTransport(brokerService.getBroker()); duplexInboundLocalBroker.setTransportListener(new DefaultTransportListener() { @@ -640,6 +640,8 @@ public abstract class DemandForwardingBridgeSupport implements NetworkBridge, Br Response reply = resp.getResult(); reply.setCorrelationId(correlationId); remoteBroker.oneway(reply); + //increment counter when messages are received in duplex mode + networkBridgeStatistics.getReceivedCount().increment(); } catch (IOException error) { LOG.error("Exception: {} on duplex forward of: {}", error, message); serviceRemoteException(error); @@ -648,6 +650,7 @@ public abstract class DemandForwardingBridgeSupport implements NetworkBridge, Br }); } else { duplexInboundLocalBroker.oneway(message); + networkBridgeStatistics.getReceivedCount().increment(); } serviceInboundMessage(message); } else { @@ -968,7 +971,7 @@ public abstract class DemandForwardingBridgeSupport implements NetworkBridge, Br try { if (command.isMessageDispatch()) { safeWaitUntilStarted(); - enqueueCounter.incrementAndGet(); + networkBridgeStatistics.getEnqueues().increment(); final MessageDispatch md = (MessageDispatch) command; final DemandSubscription sub = subscriptionMapByLocalId.get(md.getConsumerId()); if (sub != null && md.getMessage() != null && sub.incrementOutstandingResponses()) { @@ -1016,7 +1019,7 @@ public abstract class DemandForwardingBridgeSupport implements NetworkBridge, Br serviceLocalException(md, er.getException()); } else { localBroker.oneway(new MessageAck(md, MessageAck.INDIVIDUAL_ACK_TYPE, 1)); - dequeueCounter.incrementAndGet(); + networkBridgeStatistics.getDequeues().increment(); } } catch (IOException e) { serviceLocalException(md, e); @@ -1033,7 +1036,7 @@ public abstract class DemandForwardingBridgeSupport implements NetworkBridge, Br try { remoteBroker.oneway(message); localBroker.oneway(new MessageAck(md, MessageAck.INDIVIDUAL_ACK_TYPE, 1)); - dequeueCounter.incrementAndGet(); + networkBridgeStatistics.getDequeues().increment(); } finally { sub.decrementOutstandingResponses(); } @@ -1559,12 +1562,17 @@ public abstract class DemandForwardingBridgeSupport implements NetworkBridge, Br @Override public long getDequeueCounter() { - return dequeueCounter.get(); + return networkBridgeStatistics.getDequeues().getCount(); } @Override public long getEnqueueCounter() { - return enqueueCounter.get(); + return networkBridgeStatistics.getEnqueues().getCount(); + } + + @Override + public NetworkBridgeStatistics getNetworkBridgeStatistics() { + return networkBridgeStatistics; } protected boolean isDuplex() { @@ -1594,8 +1602,7 @@ public abstract class DemandForwardingBridgeSupport implements NetworkBridge, Br @Override public void resetStats() { - enqueueCounter.set(0); - dequeueCounter.set(0); + networkBridgeStatistics.reset(); } /* http://git-wip-us.apache.org/repos/asf/activemq/blob/10c998b0/activemq-broker/src/main/java/org/apache/activemq/network/NetworkBridge.java ---------------------------------------------------------------------- diff --git a/activemq-broker/src/main/java/org/apache/activemq/network/NetworkBridge.java b/activemq-broker/src/main/java/org/apache/activemq/network/NetworkBridge.java index 4ed03dc..fb9e3d9 100644 --- a/activemq-broker/src/main/java/org/apache/activemq/network/NetworkBridge.java +++ b/activemq-broker/src/main/java/org/apache/activemq/network/NetworkBridge.java @@ -79,6 +79,11 @@ public interface NetworkBridge extends Service { long getDequeueCounter(); /** + * @return the statistics for this NetworkBridge + */ + NetworkBridgeStatistics getNetworkBridgeStatistics(); + + /** * @param objectName * The ObjectName assigned to this bridge in the MBean server. */ http://git-wip-us.apache.org/repos/asf/activemq/blob/10c998b0/activemq-broker/src/main/java/org/apache/activemq/network/NetworkBridgeStatistics.java ---------------------------------------------------------------------- diff --git a/activemq-broker/src/main/java/org/apache/activemq/network/NetworkBridgeStatistics.java b/activemq-broker/src/main/java/org/apache/activemq/network/NetworkBridgeStatistics.java new file mode 100644 index 0000000..6b39d0a --- /dev/null +++ b/activemq-broker/src/main/java/org/apache/activemq/network/NetworkBridgeStatistics.java @@ -0,0 +1,102 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.activemq.network; + +import org.apache.activemq.management.CountStatisticImpl; +import org.apache.activemq.management.StatsImpl; + +/** + * The Statistics for a NewtorkBridge. + */ +public class NetworkBridgeStatistics extends StatsImpl { + + protected CountStatisticImpl enqueues; + protected CountStatisticImpl dequeues; + protected CountStatisticImpl receivedCount; + + public NetworkBridgeStatistics() { + enqueues = new CountStatisticImpl("enqueues", "The current number of enqueues this bridge has, which is the number of potential messages to be forwarded."); + dequeues = new CountStatisticImpl("dequeues", "The current number of dequeues this bridge has, which is the number of messages received by the remote broker."); + receivedCount = new CountStatisticImpl("receivedCount", "The number of messages that have been received by the NetworkBridge from the remote broker. Only applies for Duplex bridges."); + + addStatistic("enqueues", enqueues); + addStatistic("dequeues", dequeues); + addStatistic("receivedCount", receivedCount); + } + + /** + * The current number of enqueues this bridge has, which is the number of potential messages to be forwarded + * Messages may not be forwarded if there is no subscription + * + * @return + */ + public CountStatisticImpl getEnqueues() { + return enqueues; + } + + /** + * The current number of dequeues this bridge has, which is the number of + * messages actually sent to and received by the remote broker. + * + * @return + */ + public CountStatisticImpl getDequeues() { + return dequeues; + } + + /** + * The number of messages that have been received by the NetworkBridge from the remote broker. + * Only applies for Duplex bridges. + * + * @return + */ + public CountStatisticImpl getReceivedCount() { + return receivedCount; + } + + @Override + public void reset() { + if (this.isDoReset()) { + super.reset(); + enqueues.reset(); + dequeues.reset(); + receivedCount.reset(); + } + } + + @Override + public void setEnabled(boolean enabled) { + super.setEnabled(enabled); + enqueues.setEnabled(enabled); + dequeues.setEnabled(enabled); + receivedCount.setEnabled(enabled); + } + + public void setParent(NetworkBridgeStatistics parent) { + if (parent != null) { + enqueues.setParent(parent.enqueues); + dequeues.setParent(parent.dequeues); + receivedCount.setParent(parent.receivedCount); + } else { + enqueues.setParent(null); + dequeues.setParent(null); + receivedCount.setParent(null); + } + } + +} http://git-wip-us.apache.org/repos/asf/activemq/blob/10c998b0/activemq-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ4160Test.java ---------------------------------------------------------------------- diff --git a/activemq-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ4160Test.java b/activemq-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ4160Test.java index 4867f28..5563ded 100644 --- a/activemq-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ4160Test.java +++ b/activemq-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ4160Test.java @@ -35,6 +35,7 @@ import org.apache.activemq.command.DiscoveryEvent; import org.apache.activemq.network.DiscoveryNetworkConnector; import org.apache.activemq.network.NetworkBridge; import org.apache.activemq.network.NetworkBridgeListener; +import org.apache.activemq.network.NetworkBridgeStatistics; import org.apache.activemq.network.NetworkConnector; import org.apache.activemq.thread.TaskRunnerFactory; import org.apache.activemq.transport.Transport; @@ -56,6 +57,7 @@ public class AMQ4160Test extends JmsMultipleBrokersTestSupport { * Since these tests involve wait conditions, protect against indefinite * waits (due to unanticipated issues). */ + @Override public void setUp() throws Exception { setAutoFail(true); setMaxTestTime(MAX_TEST_TIME); @@ -328,6 +330,11 @@ public class AMQ4160Test extends JmsMultipleBrokersTestSupport { } @Override + public NetworkBridgeStatistics getNetworkBridgeStatistics() { + return next.getNetworkBridgeStatistics(); + } + + @Override public void setMbeanObjectName(ObjectName objectName) { next.setMbeanObjectName(objectName); } @@ -337,6 +344,7 @@ public class AMQ4160Test extends JmsMultipleBrokersTestSupport { return next.getMbeanObjectName(); } + @Override public void resetStats(){ next.resetStats(); } http://git-wip-us.apache.org/repos/asf/activemq/blob/10c998b0/activemq-unit-tests/src/test/java/org/apache/activemq/network/DuplexNetworkTest.java ---------------------------------------------------------------------- diff --git a/activemq-unit-tests/src/test/java/org/apache/activemq/network/DuplexNetworkTest.java b/activemq-unit-tests/src/test/java/org/apache/activemq/network/DuplexNetworkTest.java index 14b2e0f..da29dd6 100755 --- a/activemq-unit-tests/src/test/java/org/apache/activemq/network/DuplexNetworkTest.java +++ b/activemq-unit-tests/src/test/java/org/apache/activemq/network/DuplexNetworkTest.java @@ -86,4 +86,19 @@ public class DuplexNetworkTest extends SimpleNetworkTest { return id; } + @Override + protected void assertNetworkBridgeStatistics(final long expectedLocalSent, final long expectedRemoteSent) throws Exception { + + final NetworkBridge localBridge = localBroker.getNetworkConnectors().get(0).activeBridges().iterator().next(); + + assertTrue(Wait.waitFor(new Wait.Condition() { + @Override + public boolean isSatisified() throws Exception { + return expectedLocalSent == localBridge.getNetworkBridgeStatistics().getDequeues().getCount() && + expectedRemoteSent == localBridge.getNetworkBridgeStatistics().getReceivedCount().getCount(); + } + })); + + } + } http://git-wip-us.apache.org/repos/asf/activemq/blob/10c998b0/activemq-unit-tests/src/test/java/org/apache/activemq/network/SimpleNetworkTest.java ---------------------------------------------------------------------- diff --git a/activemq-unit-tests/src/test/java/org/apache/activemq/network/SimpleNetworkTest.java b/activemq-unit-tests/src/test/java/org/apache/activemq/network/SimpleNetworkTest.java index d7e987c..a49301f 100755 --- a/activemq-unit-tests/src/test/java/org/apache/activemq/network/SimpleNetworkTest.java +++ b/activemq-unit-tests/src/test/java/org/apache/activemq/network/SimpleNetworkTest.java @@ -96,6 +96,8 @@ public class SimpleNetworkTest { } // ensure no more messages received assertNull(consumer1.receive(1000)); + + assertNetworkBridgeStatistics(MESSAGE_COUNT, 0); } @Test(timeout = 60 * 1000) @@ -128,6 +130,8 @@ public class SimpleNetworkTest { assertNotNull(result); LOG.info(result.getText()); } + + assertNetworkBridgeStatistics(MESSAGE_COUNT, MESSAGE_COUNT); } @Test(timeout = 60 * 1000) @@ -136,13 +140,15 @@ public class SimpleNetworkTest { MessageConsumer excludedConsumer = remoteSession.createConsumer(excluded); MessageProducer includedProducer = localSession.createProducer(included); MessageProducer excludedProducer = localSession.createProducer(excluded); - // allow for consumer infos to perculate arround + // allow for consumer infos to perculate around Thread.sleep(2000); Message test = localSession.createTextMessage("test"); includedProducer.send(test); excludedProducer.send(test); assertNull(excludedConsumer.receive(1000)); assertNotNull(includedConsumer.receive(1000)); + + assertNetworkBridgeStatistics(1, 0); } @Test(timeout = 60 * 1000) @@ -163,6 +169,8 @@ public class SimpleNetworkTest { // ensure no more messages received assertNull(consumer1.receive(1000)); assertNull(consumer2.receive(1000)); + + assertNetworkBridgeStatistics(MESSAGE_COUNT, 0); } private void waitForConsumerRegistration(final BrokerService brokerService, final int min, final ActiveMQDestination destination) throws Exception { @@ -319,4 +327,20 @@ public class SimpleNetworkTest { protected BrokerService createRemoteBroker() throws Exception { return createBroker(getRemoteBrokerURI()); } + + protected void assertNetworkBridgeStatistics(final long expectedLocalSent, final long expectedRemoteSent) throws Exception { + + final NetworkBridge localBridge = localBroker.getNetworkConnectors().get(0).activeBridges().iterator().next(); + final NetworkBridge remoteBridge = remoteBroker.getNetworkConnectors().get(0).activeBridges().iterator().next(); + + assertTrue(Wait.waitFor(new Wait.Condition() { + @Override + public boolean isSatisified() throws Exception { + return expectedLocalSent == localBridge.getNetworkBridgeStatistics().getDequeues().getCount() && + 0 == localBridge.getNetworkBridgeStatistics().getReceivedCount().getCount() && + expectedRemoteSent == remoteBridge.getNetworkBridgeStatistics().getDequeues().getCount() && + 0 == remoteBridge.getNetworkBridgeStatistics().getReceivedCount().getCount(); + } + })); + } }