Repository: activemq
Updated Branches:
  refs/heads/master 2b84cd60b -> 10c998b0b


https://issues.apache.org/jira/browse/AMQ-6129

Adding NetworkBridgeStatistics and also a received count for bridges
when they are in duplex mode.


Project: http://git-wip-us.apache.org/repos/asf/activemq/repo
Commit: http://git-wip-us.apache.org/repos/asf/activemq/commit/10c998b0
Tree: http://git-wip-us.apache.org/repos/asf/activemq/tree/10c998b0
Diff: http://git-wip-us.apache.org/repos/asf/activemq/diff/10c998b0

Branch: refs/heads/master
Commit: 10c998b0bc9728276a738ed24d20c6fc82c6365a
Parents: 2b84cd6
Author: Christopher L. Shannon (cshannon) <christopher.l.shan...@gmail.com>
Authored: Fri Jan 15 14:39:49 2016 +0000
Committer: Christopher L. Shannon (cshannon) <christopher.l.shan...@gmail.com>
Committed: Fri Jan 15 14:54:04 2016 +0000

----------------------------------------------------------------------
 .../activemq/broker/jmx/NetworkBridgeView.java  |  16 +++
 .../broker/jmx/NetworkBridgeViewMBean.java      |   2 +
 .../network/DemandForwardingBridgeSupport.java  |  27 +++--
 .../apache/activemq/network/NetworkBridge.java  |   5 +
 .../network/NetworkBridgeStatistics.java        | 102 +++++++++++++++++++
 .../org/apache/activemq/bugs/AMQ4160Test.java   |   8 ++
 .../activemq/network/DuplexNetworkTest.java     |  15 +++
 .../activemq/network/SimpleNetworkTest.java     |  26 ++++-
 8 files changed, 190 insertions(+), 11 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/activemq/blob/10c998b0/activemq-broker/src/main/java/org/apache/activemq/broker/jmx/NetworkBridgeView.java
----------------------------------------------------------------------
diff --git 
a/activemq-broker/src/main/java/org/apache/activemq/broker/jmx/NetworkBridgeView.java
 
b/activemq-broker/src/main/java/org/apache/activemq/broker/jmx/NetworkBridgeView.java
index c557754..756ffbe 100644
--- 
a/activemq-broker/src/main/java/org/apache/activemq/broker/jmx/NetworkBridgeView.java
+++ 
b/activemq-broker/src/main/java/org/apache/activemq/broker/jmx/NetworkBridgeView.java
@@ -31,43 +31,58 @@ public class NetworkBridgeView implements 
NetworkBridgeViewMBean {
         this.bridge = bridge;
     }
 
+    @Override
     public void start() throws Exception {
         bridge.start();
     }
 
+    @Override
     public void stop() throws Exception {
         bridge.stop();
     }
 
+    @Override
     public String getLocalAddress() {
         return bridge.getLocalAddress();
     }
 
+    @Override
     public String getRemoteAddress() {
         return bridge.getRemoteAddress();
     }
 
+    @Override
     public String getRemoteBrokerName() {
         return bridge.getRemoteBrokerName();
     }
 
 
+    @Override
     public String getRemoteBrokerId() {
         return bridge.getRemoteBrokerId();
     }
 
+    @Override
     public String getLocalBrokerName() {
         return bridge.getLocalBrokerName();
     }
 
+    @Override
     public long getEnqueueCounter() {
         return bridge.getEnqueueCounter();
     }
 
+    @Override
     public long getDequeueCounter() {
         return bridge.getDequeueCounter();
     }
 
+    @Override
+    public long getReceivedCounter() {
+        return 
bridge.getNetworkBridgeStatistics().getReceivedCount().getCount();
+    }
+
+    @Override
     public boolean isCreatedByDuplex() {
         return createByDuplex;
     }
@@ -76,6 +91,7 @@ public class NetworkBridgeView implements 
NetworkBridgeViewMBean {
         this.createByDuplex = createByDuplex;
     }
 
+    @Override
     public void resetStats(){
         bridge.resetStats();
         for (NetworkDestinationView 
networkDestinationView:networkDestinationViewList){

http://git-wip-us.apache.org/repos/asf/activemq/blob/10c998b0/activemq-broker/src/main/java/org/apache/activemq/broker/jmx/NetworkBridgeViewMBean.java
----------------------------------------------------------------------
diff --git 
a/activemq-broker/src/main/java/org/apache/activemq/broker/jmx/NetworkBridgeViewMBean.java
 
b/activemq-broker/src/main/java/org/apache/activemq/broker/jmx/NetworkBridgeViewMBean.java
index 639a81a..82fc9ca 100644
--- 
a/activemq-broker/src/main/java/org/apache/activemq/broker/jmx/NetworkBridgeViewMBean.java
+++ 
b/activemq-broker/src/main/java/org/apache/activemq/broker/jmx/NetworkBridgeViewMBean.java
@@ -34,6 +34,8 @@ public interface NetworkBridgeViewMBean extends Service {
 
     long getDequeueCounter();
 
+    long getReceivedCounter();
+
     boolean isCreatedByDuplex();
 
     void resetStats();

http://git-wip-us.apache.org/repos/asf/activemq/blob/10c998b0/activemq-broker/src/main/java/org/apache/activemq/network/DemandForwardingBridgeSupport.java
----------------------------------------------------------------------
diff --git 
a/activemq-broker/src/main/java/org/apache/activemq/network/DemandForwardingBridgeSupport.java
 
b/activemq-broker/src/main/java/org/apache/activemq/network/DemandForwardingBridgeSupport.java
index 112f5ff..7f3eeb4 100644
--- 
a/activemq-broker/src/main/java/org/apache/activemq/network/DemandForwardingBridgeSupport.java
+++ 
b/activemq-broker/src/main/java/org/apache/activemq/network/DemandForwardingBridgeSupport.java
@@ -34,7 +34,6 @@ import java.util.concurrent.Future;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.TimeoutException;
 import java.util.concurrent.atomic.AtomicBoolean;
-import java.util.concurrent.atomic.AtomicLong;
 
 import javax.management.ObjectName;
 
@@ -139,8 +138,7 @@ public abstract class DemandForwardingBridgeSupport 
implements NetworkBridge, Br
     protected final BrokerId remoteBrokerPath[] = new BrokerId[]{null};
     protected BrokerId remoteBrokerId;
 
-    final AtomicLong enqueueCounter = new AtomicLong();
-    final AtomicLong dequeueCounter = new AtomicLong();
+    protected final NetworkBridgeStatistics networkBridgeStatistics = new 
NetworkBridgeStatistics();
 
     private NetworkBridgeListener networkBridgeListener;
     private boolean createdByDuplex;
@@ -181,6 +179,8 @@ public abstract class DemandForwardingBridgeSupport 
implements NetworkBridge, Br
                 throw new IllegalArgumentException("BrokerService is null on " 
+ this);
             }
 
+            
networkBridgeStatistics.setEnabled(brokerService.isEnableStatistics());
+
             if (isDuplex()) {
                 duplexInboundLocalBroker = 
NetworkBridgeFactory.createLocalTransport(brokerService.getBroker());
                 duplexInboundLocalBroker.setTransportListener(new 
DefaultTransportListener() {
@@ -640,6 +640,8 @@ public abstract class DemandForwardingBridgeSupport 
implements NetworkBridge, Br
                                                     Response reply = 
resp.getResult();
                                                     
reply.setCorrelationId(correlationId);
                                                     remoteBroker.oneway(reply);
+                                                    //increment counter when 
messages are received in duplex mode
+                                                    
networkBridgeStatistics.getReceivedCount().increment();
                                                 } catch (IOException error) {
                                                     LOG.error("Exception: {} 
on duplex forward of: {}", error, message);
                                                     
serviceRemoteException(error);
@@ -648,6 +650,7 @@ public abstract class DemandForwardingBridgeSupport 
implements NetworkBridge, Br
                                         });
                                     } else {
                                         
duplexInboundLocalBroker.oneway(message);
+                                        
networkBridgeStatistics.getReceivedCount().increment();
                                     }
                                     serviceInboundMessage(message);
                                 } else {
@@ -968,7 +971,7 @@ public abstract class DemandForwardingBridgeSupport 
implements NetworkBridge, Br
             try {
                 if (command.isMessageDispatch()) {
                     safeWaitUntilStarted();
-                    enqueueCounter.incrementAndGet();
+                    networkBridgeStatistics.getEnqueues().increment();
                     final MessageDispatch md = (MessageDispatch) command;
                     final DemandSubscription sub = 
subscriptionMapByLocalId.get(md.getConsumerId());
                     if (sub != null && md.getMessage() != null && 
sub.incrementOutstandingResponses()) {
@@ -1016,7 +1019,7 @@ public abstract class DemandForwardingBridgeSupport 
implements NetworkBridge, Br
                                             serviceLocalException(md, 
er.getException());
                                         } else {
                                             localBroker.oneway(new 
MessageAck(md, MessageAck.INDIVIDUAL_ACK_TYPE, 1));
-                                            dequeueCounter.incrementAndGet();
+                                            
networkBridgeStatistics.getDequeues().increment();
                                         }
                                     } catch (IOException e) {
                                         serviceLocalException(md, e);
@@ -1033,7 +1036,7 @@ public abstract class DemandForwardingBridgeSupport 
implements NetworkBridge, Br
                             try {
                                 remoteBroker.oneway(message);
                                 localBroker.oneway(new MessageAck(md, 
MessageAck.INDIVIDUAL_ACK_TYPE, 1));
-                                dequeueCounter.incrementAndGet();
+                                
networkBridgeStatistics.getDequeues().increment();
                             } finally {
                                 sub.decrementOutstandingResponses();
                             }
@@ -1559,12 +1562,17 @@ public abstract class DemandForwardingBridgeSupport 
implements NetworkBridge, Br
 
     @Override
     public long getDequeueCounter() {
-        return dequeueCounter.get();
+        return networkBridgeStatistics.getDequeues().getCount();
     }
 
     @Override
     public long getEnqueueCounter() {
-        return enqueueCounter.get();
+        return networkBridgeStatistics.getEnqueues().getCount();
+    }
+
+    @Override
+    public NetworkBridgeStatistics getNetworkBridgeStatistics() {
+        return networkBridgeStatistics;
     }
 
     protected boolean isDuplex() {
@@ -1594,8 +1602,7 @@ public abstract class DemandForwardingBridgeSupport 
implements NetworkBridge, Br
 
     @Override
     public void resetStats() {
-        enqueueCounter.set(0);
-        dequeueCounter.set(0);
+        networkBridgeStatistics.reset();
     }
 
     /*

http://git-wip-us.apache.org/repos/asf/activemq/blob/10c998b0/activemq-broker/src/main/java/org/apache/activemq/network/NetworkBridge.java
----------------------------------------------------------------------
diff --git 
a/activemq-broker/src/main/java/org/apache/activemq/network/NetworkBridge.java 
b/activemq-broker/src/main/java/org/apache/activemq/network/NetworkBridge.java
index 4ed03dc..fb9e3d9 100644
--- 
a/activemq-broker/src/main/java/org/apache/activemq/network/NetworkBridge.java
+++ 
b/activemq-broker/src/main/java/org/apache/activemq/network/NetworkBridge.java
@@ -79,6 +79,11 @@ public interface NetworkBridge extends Service {
     long getDequeueCounter();
 
     /**
+     * @return the statistics for this NetworkBridge
+     */
+    NetworkBridgeStatistics getNetworkBridgeStatistics();
+
+    /**
      * @param objectName
      *      The ObjectName assigned to this bridge in the MBean server.
      */

http://git-wip-us.apache.org/repos/asf/activemq/blob/10c998b0/activemq-broker/src/main/java/org/apache/activemq/network/NetworkBridgeStatistics.java
----------------------------------------------------------------------
diff --git 
a/activemq-broker/src/main/java/org/apache/activemq/network/NetworkBridgeStatistics.java
 
b/activemq-broker/src/main/java/org/apache/activemq/network/NetworkBridgeStatistics.java
new file mode 100644
index 0000000..6b39d0a
--- /dev/null
+++ 
b/activemq-broker/src/main/java/org/apache/activemq/network/NetworkBridgeStatistics.java
@@ -0,0 +1,102 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.activemq.network;
+
+import org.apache.activemq.management.CountStatisticImpl;
+import org.apache.activemq.management.StatsImpl;
+
+/**
+ * The Statistics for a NewtorkBridge.
+ */
+public class NetworkBridgeStatistics extends StatsImpl {
+
+    protected CountStatisticImpl enqueues;
+    protected CountStatisticImpl dequeues;
+    protected CountStatisticImpl receivedCount;
+
+    public NetworkBridgeStatistics() {
+        enqueues = new CountStatisticImpl("enqueues", "The current number of 
enqueues this bridge has, which is the number of potential messages to be 
forwarded.");
+        dequeues = new CountStatisticImpl("dequeues", "The current number of 
dequeues this bridge has, which is the number of messages received by the 
remote broker.");
+        receivedCount = new CountStatisticImpl("receivedCount", "The number of 
messages that have been received by the NetworkBridge from the remote broker.  
Only applies for Duplex bridges.");
+
+        addStatistic("enqueues", enqueues);
+        addStatistic("dequeues", dequeues);
+        addStatistic("receivedCount", receivedCount);
+    }
+
+    /**
+     * The current number of enqueues this bridge has, which is the number of 
potential messages to be forwarded
+     * Messages may not be forwarded if there is no subscription
+     *
+     * @return
+     */
+    public CountStatisticImpl getEnqueues() {
+        return enqueues;
+    }
+
+    /**
+     * The current number of dequeues this bridge has, which is the number of
+     * messages actually sent to and received by the remote broker.
+     *
+     * @return
+     */
+    public CountStatisticImpl getDequeues() {
+        return dequeues;
+    }
+
+    /**
+     * The number of messages that have been received by the NetworkBridge 
from the remote broker.
+     * Only applies for Duplex bridges.
+     *
+     * @return
+     */
+    public CountStatisticImpl getReceivedCount() {
+        return receivedCount;
+    }
+
+    @Override
+    public void reset() {
+        if (this.isDoReset()) {
+            super.reset();
+            enqueues.reset();
+            dequeues.reset();
+            receivedCount.reset();
+        }
+    }
+
+    @Override
+    public void setEnabled(boolean enabled) {
+        super.setEnabled(enabled);
+        enqueues.setEnabled(enabled);
+        dequeues.setEnabled(enabled);
+        receivedCount.setEnabled(enabled);
+    }
+
+    public void setParent(NetworkBridgeStatistics parent) {
+        if (parent != null) {
+            enqueues.setParent(parent.enqueues);
+            dequeues.setParent(parent.dequeues);
+            receivedCount.setParent(parent.receivedCount);
+        } else {
+            enqueues.setParent(null);
+            dequeues.setParent(null);
+            receivedCount.setParent(null);
+        }
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/activemq/blob/10c998b0/activemq-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ4160Test.java
----------------------------------------------------------------------
diff --git 
a/activemq-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ4160Test.java 
b/activemq-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ4160Test.java
index 4867f28..5563ded 100644
--- 
a/activemq-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ4160Test.java
+++ 
b/activemq-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ4160Test.java
@@ -35,6 +35,7 @@ import org.apache.activemq.command.DiscoveryEvent;
 import org.apache.activemq.network.DiscoveryNetworkConnector;
 import org.apache.activemq.network.NetworkBridge;
 import org.apache.activemq.network.NetworkBridgeListener;
+import org.apache.activemq.network.NetworkBridgeStatistics;
 import org.apache.activemq.network.NetworkConnector;
 import org.apache.activemq.thread.TaskRunnerFactory;
 import org.apache.activemq.transport.Transport;
@@ -56,6 +57,7 @@ public class AMQ4160Test extends 
JmsMultipleBrokersTestSupport {
      * Since these tests involve wait conditions, protect against indefinite
      * waits (due to unanticipated issues).
      */
+    @Override
     public void setUp() throws Exception {
         setAutoFail(true);
         setMaxTestTime(MAX_TEST_TIME);
@@ -328,6 +330,11 @@ public class AMQ4160Test extends 
JmsMultipleBrokersTestSupport {
                     }
 
                     @Override
+                    public NetworkBridgeStatistics 
getNetworkBridgeStatistics() {
+                        return next.getNetworkBridgeStatistics();
+                    }
+
+                    @Override
                     public void setMbeanObjectName(ObjectName objectName) {
                         next.setMbeanObjectName(objectName);
                     }
@@ -337,6 +344,7 @@ public class AMQ4160Test extends 
JmsMultipleBrokersTestSupport {
                         return next.getMbeanObjectName();
                     }
 
+                    @Override
                     public void resetStats(){
                         next.resetStats();
                     }

http://git-wip-us.apache.org/repos/asf/activemq/blob/10c998b0/activemq-unit-tests/src/test/java/org/apache/activemq/network/DuplexNetworkTest.java
----------------------------------------------------------------------
diff --git 
a/activemq-unit-tests/src/test/java/org/apache/activemq/network/DuplexNetworkTest.java
 
b/activemq-unit-tests/src/test/java/org/apache/activemq/network/DuplexNetworkTest.java
index 14b2e0f..da29dd6 100755
--- 
a/activemq-unit-tests/src/test/java/org/apache/activemq/network/DuplexNetworkTest.java
+++ 
b/activemq-unit-tests/src/test/java/org/apache/activemq/network/DuplexNetworkTest.java
@@ -86,4 +86,19 @@ public class DuplexNetworkTest extends SimpleNetworkTest {
         return id;
     }
 
+    @Override
+    protected void assertNetworkBridgeStatistics(final long expectedLocalSent, 
final long expectedRemoteSent) throws Exception {
+
+        final NetworkBridge localBridge = 
localBroker.getNetworkConnectors().get(0).activeBridges().iterator().next();
+
+        assertTrue(Wait.waitFor(new Wait.Condition() {
+            @Override
+            public boolean isSatisified() throws Exception {
+                return expectedLocalSent == 
localBridge.getNetworkBridgeStatistics().getDequeues().getCount() &&
+                        expectedRemoteSent == 
localBridge.getNetworkBridgeStatistics().getReceivedCount().getCount();
+            }
+        }));
+
+    }
+
 }

http://git-wip-us.apache.org/repos/asf/activemq/blob/10c998b0/activemq-unit-tests/src/test/java/org/apache/activemq/network/SimpleNetworkTest.java
----------------------------------------------------------------------
diff --git 
a/activemq-unit-tests/src/test/java/org/apache/activemq/network/SimpleNetworkTest.java
 
b/activemq-unit-tests/src/test/java/org/apache/activemq/network/SimpleNetworkTest.java
index d7e987c..a49301f 100755
--- 
a/activemq-unit-tests/src/test/java/org/apache/activemq/network/SimpleNetworkTest.java
+++ 
b/activemq-unit-tests/src/test/java/org/apache/activemq/network/SimpleNetworkTest.java
@@ -96,6 +96,8 @@ public class SimpleNetworkTest {
         }
         // ensure no more messages received
         assertNull(consumer1.receive(1000));
+
+        assertNetworkBridgeStatistics(MESSAGE_COUNT, 0);
     }
 
     @Test(timeout = 60 * 1000)
@@ -128,6 +130,8 @@ public class SimpleNetworkTest {
             assertNotNull(result);
             LOG.info(result.getText());
         }
+
+        assertNetworkBridgeStatistics(MESSAGE_COUNT, MESSAGE_COUNT);
     }
 
     @Test(timeout = 60 * 1000)
@@ -136,13 +140,15 @@ public class SimpleNetworkTest {
         MessageConsumer excludedConsumer = 
remoteSession.createConsumer(excluded);
         MessageProducer includedProducer = 
localSession.createProducer(included);
         MessageProducer excludedProducer = 
localSession.createProducer(excluded);
-        // allow for consumer infos to perculate arround
+        // allow for consumer infos to perculate around
         Thread.sleep(2000);
         Message test = localSession.createTextMessage("test");
         includedProducer.send(test);
         excludedProducer.send(test);
         assertNull(excludedConsumer.receive(1000));
         assertNotNull(includedConsumer.receive(1000));
+
+        assertNetworkBridgeStatistics(1, 0);
     }
 
     @Test(timeout = 60 * 1000)
@@ -163,6 +169,8 @@ public class SimpleNetworkTest {
         // ensure no more messages received
         assertNull(consumer1.receive(1000));
         assertNull(consumer2.receive(1000));
+
+        assertNetworkBridgeStatistics(MESSAGE_COUNT, 0);
     }
 
     private void waitForConsumerRegistration(final BrokerService 
brokerService, final int min, final ActiveMQDestination destination) throws 
Exception {
@@ -319,4 +327,20 @@ public class SimpleNetworkTest {
     protected BrokerService createRemoteBroker() throws Exception {
         return createBroker(getRemoteBrokerURI());
     }
+
+    protected void assertNetworkBridgeStatistics(final long expectedLocalSent, 
final long expectedRemoteSent) throws Exception {
+
+        final NetworkBridge localBridge = 
localBroker.getNetworkConnectors().get(0).activeBridges().iterator().next();
+        final NetworkBridge remoteBridge = 
remoteBroker.getNetworkConnectors().get(0).activeBridges().iterator().next();
+
+        assertTrue(Wait.waitFor(new Wait.Condition() {
+            @Override
+            public boolean isSatisified() throws Exception {
+                return expectedLocalSent == 
localBridge.getNetworkBridgeStatistics().getDequeues().getCount() &&
+                       0 == 
localBridge.getNetworkBridgeStatistics().getReceivedCount().getCount() &&
+                       expectedRemoteSent == 
remoteBridge.getNetworkBridgeStatistics().getDequeues().getCount() &&
+                       0 == 
remoteBridge.getNetworkBridgeStatistics().getReceivedCount().getCount();
+            }
+        }));
+    }
 }

Reply via email to