Repository: activemq Updated Branches: refs/heads/activemq-5.13.x a82bd3cf7 -> 4fc16630e
Revert "https://issues.apache.org/jira/browse/AMQ-6366" This reverts commit a82bd3cf721e32272b2cf3dee3aa1afcc726c3cb. Project: http://git-wip-us.apache.org/repos/asf/activemq/repo Commit: http://git-wip-us.apache.org/repos/asf/activemq/commit/4fc16630 Tree: http://git-wip-us.apache.org/repos/asf/activemq/tree/4fc16630 Diff: http://git-wip-us.apache.org/repos/asf/activemq/diff/4fc16630 Branch: refs/heads/activemq-5.13.x Commit: 4fc16630ea94a8eda96ffa99a8e559ca4355c02a Parents: a82bd3c Author: Christopher L. Shannon (cshannon) <christopher.l.shan...@gmail.com> Authored: Tue Jul 19 15:19:58 2016 -0400 Committer: Christopher L. Shannon (cshannon) <christopher.l.shan...@gmail.com> Committed: Tue Jul 19 15:19:58 2016 -0400 ---------------------------------------------------------------------- .../activemq/broker/TransportConnection.java | 14 +- .../apache/activemq/usecases/AMQ6366Test.java | 141 ------------------- 2 files changed, 4 insertions(+), 151 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/activemq/blob/4fc16630/activemq-broker/src/main/java/org/apache/activemq/broker/TransportConnection.java ---------------------------------------------------------------------- diff --git a/activemq-broker/src/main/java/org/apache/activemq/broker/TransportConnection.java b/activemq-broker/src/main/java/org/apache/activemq/broker/TransportConnection.java index b16383a..350f529 100755 --- a/activemq-broker/src/main/java/org/apache/activemq/broker/TransportConnection.java +++ b/activemq-broker/src/main/java/org/apache/activemq/broker/TransportConnection.java @@ -27,7 +27,6 @@ import java.util.LinkedList; import java.util.List; import java.util.Map; import java.util.Properties; -import java.util.Set; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.CopyOnWriteArrayList; import java.util.concurrent.CountDownLatch; @@ -116,9 +115,9 @@ public class TransportConnection implements Connection, Task, CommandVisitor { protected final Map<ConnectionId, ConnectionState> brokerConnectionStates; // The broker and wireformat info that was exchanged. protected BrokerInfo brokerInfo; - protected final List<Command> dispatchQueue = new LinkedList<>(); + protected final List<Command> dispatchQueue = new LinkedList<Command>(); protected TaskRunner taskRunner; - protected final AtomicReference<Throwable> transportException = new AtomicReference<>(); + protected final AtomicReference<Throwable> transportException = new AtomicReference<Throwable>(); protected AtomicBoolean dispatchStopped = new AtomicBoolean(false); private final Transport transport; private MessageAuthorizationPolicy messageAuthorizationPolicy; @@ -140,8 +139,8 @@ public class TransportConnection implements Connection, Task, CommandVisitor { private final AtomicBoolean stopping = new AtomicBoolean(false); private final CountDownLatch stopped = new CountDownLatch(1); private final AtomicBoolean asyncException = new AtomicBoolean(false); - private final Map<ProducerId, ProducerBrokerExchange> producerExchanges = new HashMap<>(); - private final Map<ConsumerId, ConsumerBrokerExchange> consumerExchanges = new HashMap<>(); + private final Map<ProducerId, ProducerBrokerExchange> producerExchanges = new HashMap<ProducerId, ProducerBrokerExchange>(); + private final Map<ConsumerId, ConsumerBrokerExchange> consumerExchanges = new HashMap<ConsumerId, ConsumerBrokerExchange>(); private final CountDownLatch dispatchStoppedLatch = new CountDownLatch(1); private ConnectionContext context; private boolean networkConnection; @@ -1395,11 +1394,6 @@ public class TransportConnection implements Connection, Task, CommandVisitor { listener.setCreatedByDuplex(true); duplexBridge = NetworkBridgeFactory.createBridge(config, localTransport, remoteBridgeTransport, listener); duplexBridge.setBrokerService(broker.getBrokerService()); - Set<ActiveMQDestination> durableDestinations = broker.getDurableDestinations(); - //Need to set durableDestinations to properly restart subs when dynamicOnly=false - if (durableDestinations != null) { - duplexBridge.setDurableDestinations(broker.getDurableDestinations().toArray(new ActiveMQDestination[0])); - } // now turn duplex off this side info.setDuplexConnection(false); duplexBridge.setCreatedByDuplex(true); http://git-wip-us.apache.org/repos/asf/activemq/blob/4fc16630/activemq-unit-tests/src/test/java/org/apache/activemq/usecases/AMQ6366Test.java ---------------------------------------------------------------------- diff --git a/activemq-unit-tests/src/test/java/org/apache/activemq/usecases/AMQ6366Test.java b/activemq-unit-tests/src/test/java/org/apache/activemq/usecases/AMQ6366Test.java deleted file mode 100644 index ec75232..0000000 --- a/activemq-unit-tests/src/test/java/org/apache/activemq/usecases/AMQ6366Test.java +++ /dev/null @@ -1,141 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.activemq.usecases; - -import java.io.File; -import java.net.URI; -import java.util.HashSet; -import java.util.List; -import java.util.Set; - -import javax.jms.MessageConsumer; - -import org.apache.activemq.JmsMultipleBrokersTestSupport; -import org.apache.activemq.broker.BrokerService; -import org.apache.activemq.broker.TransportConnector; -import org.apache.activemq.broker.region.DurableTopicSubscription; -import org.apache.activemq.broker.region.Topic; -import org.apache.activemq.command.ActiveMQDestination; -import org.apache.activemq.command.ActiveMQTopic; -import org.apache.activemq.network.DiscoveryNetworkConnector; -import org.apache.activemq.network.NetworkConnector; -import org.apache.activemq.util.IOHelper; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -/** - * Show that both directions of a duplex bridge will properly restart the - * network durable consumers if dynamicOnly is false. - */ -public class AMQ6366Test extends JmsMultipleBrokersTestSupport { - protected static final Logger LOG = LoggerFactory.getLogger(AMQ6366Test.class); - final ActiveMQTopic dest = new ActiveMQTopic("TEST.FOO"); - - - /** - * This test works even before AMQ6366 - * @throws Exception - */ - public void testDuplexDurableSubRestarted() throws Exception { - testNonDurableReceiveThrougRestart("BrokerA", "BrokerB"); - } - - /** - * This test failed before AMQ6366 because the NC durable consumer was - * never properly activated. - * - * @throws Exception - */ - public void testDuplexDurableSubRestartedReverse() throws Exception { - testNonDurableReceiveThrougRestart("BrokerB", "BrokerA"); - } - - protected void testNonDurableReceiveThrougRestart(String pubBroker, String conBroker) throws Exception { - NetworkConnector networkConnector = bridgeBrokerPair("BrokerA", "BrokerB"); - - startAllBrokers(); - waitForBridgeFormation(); - - MessageConsumer client = createDurableSubscriber(conBroker, dest, "sub1"); - client.close(); - - Thread.sleep(1000); - networkConnector.stop(); - Thread.sleep(1000); - - Set<ActiveMQDestination> durableDests = new HashSet<>(); - durableDests.add(dest); - //Normally set on broker start from the persistence layer but - //simulate here since we just stopped and started the network connector - //without a restart - networkConnector.setDurableDestinations(durableDests); - networkConnector.start(); - waitForBridgeFormation(); - - // Send messages - sendMessages(pubBroker, dest, 1); - Thread.sleep(1000); - - Topic destination = (Topic) brokers.get(conBroker).broker.getDestination(dest); - DurableTopicSubscription sub = destination.getDurableTopicSubs(). - values().toArray(new DurableTopicSubscription[0])[0]; - - //Assert that the message made it to the other broker - assertEquals(1, sub.getSubscriptionStatistics().getEnqueues().getCount()); - } - - @Override - protected void configureBroker(BrokerService broker) { - broker.getManagementContext().setCreateConnector(false); - broker.setAdvisorySupport(true); - } - - protected NetworkConnector bridgeBrokerPair(String localBrokerName, String remoteBrokerName) throws Exception { - BrokerService localBroker = brokers.get(localBrokerName).broker; - BrokerService remoteBroker = brokers.get(remoteBrokerName).broker; - - List<TransportConnector> transportConnectors = remoteBroker.getTransportConnectors(); - URI remoteURI; - if (!transportConnectors.isEmpty()) { - remoteURI = transportConnectors.get(0).getConnectUri(); - String uri = "static:(" + remoteURI + ")"; - NetworkConnector connector = new DiscoveryNetworkConnector(new URI(uri)); - connector.setDynamicOnly(false); // so matching durable subs are loaded on start - connector.setStaticBridge(false); - connector.setDuplex(true); - connector.addDynamicallyIncludedDestination(dest); - localBroker.addNetworkConnector(connector); - return connector; - } else { - throw new Exception("Remote broker has no registered connectors."); - } - } - - @Override - public void setUp() throws Exception { - File dataDir = new File(IOHelper.getDefaultDataDirectory()); - LOG.info("Delete dataDir.." + dataDir.getCanonicalPath()); - org.apache.activemq.TestSupport.recursiveDelete(dataDir); - super.setAutoFail(true); - super.setUp(); - createBroker(new URI( - "broker:(tcp://0.0.0.0:0)/BrokerA")); - createBroker(new URI( - "broker:(tcp://0.0.0.0:0)/BrokerB")); - - } -}