[AMQ-6640] allign use of sync vm transport usage on duplex end of 
networkconnector with initiator end. only duplexinbound for forwarding is async 
to allow thread for responses. vm transport options applied in one place and 
test more deterministic w.r.t the hang scenario


Project: http://git-wip-us.apache.org/repos/asf/activemq/repo
Commit: http://git-wip-us.apache.org/repos/asf/activemq/commit/d84a5865
Tree: http://git-wip-us.apache.org/repos/asf/activemq/tree/d84a5865
Diff: http://git-wip-us.apache.org/repos/asf/activemq/diff/d84a5865

Branch: refs/heads/master
Commit: d84a58656c0b3fcb2aad04e47ef843bf379a25f0
Parents: 770a73e
Author: gtully <[email protected]>
Authored: Wed Mar 29 23:50:47 2017 +0100
Committer: gtully <[email protected]>
Committed: Wed Mar 29 23:50:47 2017 +0100

----------------------------------------------------------------------
 .../apache/activemq/broker/BrokerService.java   |  8 +------
 .../activemq/broker/TransportConnection.java    |  2 +-
 .../network/DemandForwardingBridgeSupport.java  | 22 ++++++++++++--------
 .../activemq/network/NetworkBridgeFactory.java  | 13 +++++++++---
 .../activemq/network/NetworkConnector.java      |  2 +-
 .../activemq/network/NetworkRouteTest.java      |  6 +++---
 .../usecases/DuplexAdvisoryRaceTest.java        | 11 +++++-----
 .../src/test/resources/log4j.properties         |  1 +
 8 files changed, 35 insertions(+), 30 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/activemq/blob/d84a5865/activemq-broker/src/main/java/org/apache/activemq/broker/BrokerService.java
----------------------------------------------------------------------
diff --git 
a/activemq-broker/src/main/java/org/apache/activemq/broker/BrokerService.java 
b/activemq-broker/src/main/java/org/apache/activemq/broker/BrokerService.java
index 8cd15d8..de70d29 100644
--- 
a/activemq-broker/src/main/java/org/apache/activemq/broker/BrokerService.java
+++ 
b/activemq-broker/src/main/java/org/apache/activemq/broker/BrokerService.java
@@ -2617,12 +2617,6 @@ public class BrokerService implements Service {
             setTransportConnectors(al);
         }
         this.slave = false;
-        URI uri = getVmConnectorURI();
-        Map<String, String> map = new 
HashMap<>(URISupport.parseParameters(uri));
-        map.put("async", "false");
-        map.put("create","false");
-        uri = URISupport.createURIWithQuery(uri, 
URISupport.createQueryString(map));
-
         if (!stopped.get()) {
             ThreadPoolExecutor networkConnectorStartExecutor = null;
             if (isNetworkConnectorStartAsync()) {
@@ -2642,7 +2636,7 @@ public class BrokerService implements Service {
 
             for (Iterator<NetworkConnector> iter = 
getNetworkConnectors().iterator(); iter.hasNext();) {
                 final NetworkConnector connector = iter.next();
-                connector.setLocalUri(uri);
+                connector.setLocalUri(getVmConnectorURI());
                 startNetworkConnector(connector, durableDestinations, 
networkConnectorStartExecutor);
             }
             if (networkConnectorStartExecutor != null) {

http://git-wip-us.apache.org/repos/asf/activemq/blob/d84a5865/activemq-broker/src/main/java/org/apache/activemq/broker/TransportConnection.java
----------------------------------------------------------------------
diff --git 
a/activemq-broker/src/main/java/org/apache/activemq/broker/TransportConnection.java
 
b/activemq-broker/src/main/java/org/apache/activemq/broker/TransportConnection.java
index a58eda3..69d29bc 100644
--- 
a/activemq-broker/src/main/java/org/apache/activemq/broker/TransportConnection.java
+++ 
b/activemq-broker/src/main/java/org/apache/activemq/broker/TransportConnection.java
@@ -1428,7 +1428,7 @@ public class TransportConnection implements Connection, 
Task, CommandVisitor {
                     }
                     setDuplexNetworkConnectorId(duplexNetworkConnectorId);
                 }
-                Transport localTransport = 
NetworkBridgeFactory.createLocalTransport(broker);
+                Transport localTransport = 
NetworkBridgeFactory.createLocalTransport(broker.getVmConnectorURI());
                 Transport remoteBridgeTransport = transport;
                 if (! (remoteBridgeTransport instanceof ResponseCorrelator)) {
                     // the vm transport case is already wrapped

http://git-wip-us.apache.org/repos/asf/activemq/blob/d84a5865/activemq-broker/src/main/java/org/apache/activemq/network/DemandForwardingBridgeSupport.java
----------------------------------------------------------------------
diff --git 
a/activemq-broker/src/main/java/org/apache/activemq/network/DemandForwardingBridgeSupport.java
 
b/activemq-broker/src/main/java/org/apache/activemq/network/DemandForwardingBridgeSupport.java
index 459501c..e343ad6 100644
--- 
a/activemq-broker/src/main/java/org/apache/activemq/network/DemandForwardingBridgeSupport.java
+++ 
b/activemq-broker/src/main/java/org/apache/activemq/network/DemandForwardingBridgeSupport.java
@@ -193,7 +193,7 @@ public abstract class DemandForwardingBridgeSupport 
implements NetworkBridge, Br
             
networkBridgeStatistics.setEnabled(brokerService.isEnableStatistics());
 
             if (isDuplex()) {
-                duplexInboundLocalBroker = 
NetworkBridgeFactory.createLocalTransport(brokerService.getBroker());
+                duplexInboundLocalBroker = 
NetworkBridgeFactory.createLocalAsyncTransport(brokerService.getBroker().getVmConnectorURI());
                 duplexInboundLocalBroker.setTransportListener(new 
DefaultTransportListener() {
 
                     @Override
@@ -830,9 +830,18 @@ public abstract class DemandForwardingBridgeSupport 
implements NetworkBridge, Br
         demandConsumerDispatched++;
         if (demandConsumerDispatched > (demandConsumerInfo.getPrefetchSize() *
                 (configuration.getAdvisoryAckPercentage() / 100f))) {
-            MessageAck ack = new MessageAck(message, 
MessageAck.STANDARD_ACK_TYPE, demandConsumerDispatched);
+            final MessageAck ack = new MessageAck(message, 
MessageAck.STANDARD_ACK_TYPE, demandConsumerDispatched);
             ack.setConsumerId(demandConsumerInfo.getConsumerId());
-            remoteBroker.oneway(ack);
+            brokerService.getTaskRunnerFactory().execute(new Runnable() {
+                @Override
+                public void run() {
+                    try {
+                        remoteBroker.oneway(ack);
+                    } catch (IOException e) {
+                        LOG.warn("Failed to send advisory ack " + ack, e);
+                    }
+                }
+            });
             demandConsumerDispatched = 0;
         }
     }
@@ -1039,12 +1048,7 @@ public abstract class DemandForwardingBridgeSupport 
implements NetworkBridge, Br
 
     protected void addSubscription(DemandSubscription sub) throws IOException {
         if (sub != null) {
-            if (isCreatedByDuplex() && 
!isDuplicateSuppressionOff(sub.getRemoteInfo())) {
-                // async vm transport on duplex end, need to wait for 
completion
-                localBroker.request(sub.getLocalInfo());
-            } else {
-                localBroker.oneway(sub.getLocalInfo());
-            }
+            localBroker.oneway(sub.getLocalInfo());
         }
     }
 

http://git-wip-us.apache.org/repos/asf/activemq/blob/d84a5865/activemq-broker/src/main/java/org/apache/activemq/network/NetworkBridgeFactory.java
----------------------------------------------------------------------
diff --git 
a/activemq-broker/src/main/java/org/apache/activemq/network/NetworkBridgeFactory.java
 
b/activemq-broker/src/main/java/org/apache/activemq/network/NetworkBridgeFactory.java
index 0e938ae..32711a4 100644
--- 
a/activemq-broker/src/main/java/org/apache/activemq/network/NetworkBridgeFactory.java
+++ 
b/activemq-broker/src/main/java/org/apache/activemq/network/NetworkBridgeFactory.java
@@ -58,10 +58,17 @@ public final class NetworkBridgeFactory {
         return result;
     }
 
-    public static Transport createLocalTransport(Broker broker) throws 
Exception {
-        URI uri = broker.getVmConnectorURI();
+    public static Transport createLocalTransport(URI uri) throws Exception {
+        return createLocalTransport(uri, false);
+    }
+
+    public static Transport createLocalAsyncTransport(URI uri) throws 
Exception {
+        return createLocalTransport(uri, true);
+    }
+
+    private static Transport createLocalTransport(URI uri, boolean async) 
throws Exception {
         HashMap<String, String> map = new HashMap<String, 
String>(URISupport.parseParameters(uri));
-        map.put("async", "true");
+        map.put("async", String.valueOf(async));
         map.put("create", "false"); // we don't want a vm connect during 
shutdown to trigger a broker create
         uri = URISupport.createURIWithQuery(uri, 
URISupport.createQueryString(map));
         return TransportFactory.connect(uri);

http://git-wip-us.apache.org/repos/asf/activemq/blob/d84a5865/activemq-broker/src/main/java/org/apache/activemq/network/NetworkConnector.java
----------------------------------------------------------------------
diff --git 
a/activemq-broker/src/main/java/org/apache/activemq/network/NetworkConnector.java
 
b/activemq-broker/src/main/java/org/apache/activemq/network/NetworkConnector.java
index 5faf94c..f943b82 100644
--- 
a/activemq-broker/src/main/java/org/apache/activemq/network/NetworkConnector.java
+++ 
b/activemq-broker/src/main/java/org/apache/activemq/network/NetworkConnector.java
@@ -140,7 +140,7 @@ public abstract class NetworkConnector extends 
NetworkBridgeConfiguration implem
     }
 
     protected Transport createLocalTransport() throws Exception {
-        return TransportFactory.connect(localURI);
+        return NetworkBridgeFactory.createLocalTransport(localURI);
     }
 
     public static ActiveMQDestination[] getDurableTopicDestinations(final 
Set<ActiveMQDestination> durableDestinations) {

http://git-wip-us.apache.org/repos/asf/activemq/blob/d84a5865/activemq-unit-tests/src/test/java/org/apache/activemq/network/NetworkRouteTest.java
----------------------------------------------------------------------
diff --git 
a/activemq-unit-tests/src/test/java/org/apache/activemq/network/NetworkRouteTest.java
 
b/activemq-unit-tests/src/test/java/org/apache/activemq/network/NetworkRouteTest.java
index 2b363b3..afa438e 100644
--- 
a/activemq-unit-tests/src/test/java/org/apache/activemq/network/NetworkRouteTest.java
+++ 
b/activemq-unit-tests/src/test/java/org/apache/activemq/network/NetworkRouteTest.java
@@ -63,7 +63,7 @@ public class NetworkRouteTest {
 
     @Test
     public void verifyNoRemoveOnOneConduitRemove() throws Exception {
-        
EasyMock.expect(localBroker.request(EasyMock.isA(ConsumerInfo.class))).andReturn(null);
+        localBroker.oneway(EasyMock.isA(ConsumerInfo.class));
         control.replay();
 
         remoteListener.onCommand(path2Msg);
@@ -76,7 +76,7 @@ public class NetworkRouteTest {
     @Test
     public void addAndRemoveOppositeOrder() throws Exception {
         // from (1)
-        localBroker.request(EasyMock.isA(ConsumerInfo.class));
+        localBroker.oneway(EasyMock.isA(ConsumerInfo.class));
         ArgHolder localConsumer = ArgHolder.holdArgsForLastObjectCall();
         // from (2a)
         remoteBroker.asyncRequest(EasyMock.isA(ActiveMQMessage.class), 
EasyMock.isA(ResponseCallback.class));
@@ -123,7 +123,7 @@ public class NetworkRouteTest {
     @Test
     public void addAndRemoveSameOrder() throws Exception {
         // from (1)
-        localBroker.request(EasyMock.isA(ConsumerInfo.class));
+        localBroker.oneway(EasyMock.isA(ConsumerInfo.class));
         ArgHolder localConsumer = ArgHolder.holdArgsForLastObjectCall();
 
         // from (2a)

http://git-wip-us.apache.org/repos/asf/activemq/blob/d84a5865/activemq-unit-tests/src/test/java/org/apache/activemq/usecases/DuplexAdvisoryRaceTest.java
----------------------------------------------------------------------
diff --git 
a/activemq-unit-tests/src/test/java/org/apache/activemq/usecases/DuplexAdvisoryRaceTest.java
 
b/activemq-unit-tests/src/test/java/org/apache/activemq/usecases/DuplexAdvisoryRaceTest.java
index 9919ec9..b34b67e 100644
--- 
a/activemq-unit-tests/src/test/java/org/apache/activemq/usecases/DuplexAdvisoryRaceTest.java
+++ 
b/activemq-unit-tests/src/test/java/org/apache/activemq/usecases/DuplexAdvisoryRaceTest.java
@@ -126,9 +126,9 @@ public class DuplexAdvisoryRaceTest {
                 + "?jms.watchTopicAdvisories=false");
 
         // populate dests
-        final int numDests = 200;
-        final int numMessagesPerDest = 300;
-        final int numConsumersPerDest = 100;
+        final int numDests = 800;
+        final int numMessagesPerDest = 50;
+        final int numConsumersPerDest = 5;
         populate(brokerAFactory, 0, numDests/2, numMessagesPerDest);
         populate(brokerBFactory, numDests/2, numDests, numMessagesPerDest);
 
@@ -148,7 +148,7 @@ public class DuplexAdvisoryRaceTest {
                 LOG.info("received: " + responseReceived.get());
                 return responseReceived.get() >= numMessagesPerDest * numDests;
             }
-        }, 2*60*1000)) {
+        }, 5*60*1000)) {
 
            org.apache.activemq.TestSupport.dumpAllThreads("DD");
 
@@ -177,7 +177,6 @@ public class DuplexAdvisoryRaceTest {
         connection.start();
         Session session = connection.createSession(false, 
Session.AUTO_ACKNOWLEDGE);
         final BytesMessage message = session.createBytesMessage();
-        //message.writeBytes(new byte[50]);
         MessageProducer producer = session.createProducer(null);;
         for (int i=minDest; i<maxDest; i++) {
             Destination destination = qFromInt(i);
@@ -236,7 +235,7 @@ public class DuplexAdvisoryRaceTest {
 
     protected NetworkConnector bridgeBrokers(BrokerService localBroker, 
BrokerService remoteBroker) throws Exception {
 
-        String uri = "static:(failover:(" + networkConnectorUrlString + 
"?socketBufferSize=1024)?maxReconnectAttempts=0)";
+        String uri = "static:(failover:(" + networkConnectorUrlString + 
"?socketBufferSize=1024&trace=false)?maxReconnectAttempts=0)";
 
         NetworkConnector connector = new DiscoveryNetworkConnector(new 
URI(uri));
         connector.setName(localBroker.getBrokerName() + "-to-" + 
remoteBroker.getBrokerName());

http://git-wip-us.apache.org/repos/asf/activemq/blob/d84a5865/activemq-unit-tests/src/test/resources/log4j.properties
----------------------------------------------------------------------
diff --git a/activemq-unit-tests/src/test/resources/log4j.properties 
b/activemq-unit-tests/src/test/resources/log4j.properties
index 4704dbc..42d8c80 100644
--- a/activemq-unit-tests/src/test/resources/log4j.properties
+++ b/activemq-unit-tests/src/test/resources/log4j.properties
@@ -24,6 +24,7 @@ log4j.rootLogger=INFO, out, stdout
 #log4j.logger.org.apache.activemq.store.kahadb.scheduler=DEBUG
 #log4j.logger.org.apache.activemq.network.DemandForwardingBridgeSupport=DEBUG
 #log4j.logger.org.apache.activemq.transport.failover=TRACE
+#log4j.logger.org.apache.activemq.transport.TransportLogger.Connection=TRACE
 #log4j.logger.org.apache.activemq.store.jdbc=TRACE
 #log4j.logger.org.apache.activemq.store.kahadb=TRACE
 
#log4j.logger.org.apache.activemq.broker.region.cursors.AbstractStoreCursor=DEBUG

Reply via email to