Repository: activemq
Updated Branches:
  refs/heads/activemq-5.14.x 161ba22f6 -> df7aa6e89


https://issues.apache.org/jira/browse/AMQ-6561

Stop connections on all connection attempt errors, not just security
exceptions, to prevent dangling open sockets.

(cherry picked from commit d9c74d7317677725a52466349f7ed91911664d3d)


Project: http://git-wip-us.apache.org/repos/asf/activemq/repo
Commit: http://git-wip-us.apache.org/repos/asf/activemq/commit/df7aa6e8
Tree: http://git-wip-us.apache.org/repos/asf/activemq/tree/df7aa6e8
Diff: http://git-wip-us.apache.org/repos/asf/activemq/diff/df7aa6e8

Branch: refs/heads/activemq-5.14.x
Commit: df7aa6e892853fd442739a49417ecdf34a6cac63
Parents: 161ba22
Author: Christopher L. Shannon (cshannon) <christopher.l.shan...@gmail.com>
Authored: Thu Jan 12 10:21:08 2017 -0500
Committer: Christopher L. Shannon (cshannon) <christopher.l.shan...@gmail.com>
Committed: Thu Jan 12 10:22:19 2017 -0500

----------------------------------------------------------------------
 .../activemq/broker/TransportConnection.java    |   7 +-
 .../tcp/TcpTransportCloseSocketTest.java        | 133 +++++++++++++++++++
 2 files changed, 136 insertions(+), 4 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/activemq/blob/df7aa6e8/activemq-broker/src/main/java/org/apache/activemq/broker/TransportConnection.java
----------------------------------------------------------------------
diff --git 
a/activemq-broker/src/main/java/org/apache/activemq/broker/TransportConnection.java
 
b/activemq-broker/src/main/java/org/apache/activemq/broker/TransportConnection.java
index 0507f2a..fc61bbd 100755
--- 
a/activemq-broker/src/main/java/org/apache/activemq/broker/TransportConnection.java
+++ 
b/activemq-broker/src/main/java/org/apache/activemq/broker/TransportConnection.java
@@ -856,10 +856,9 @@ public class TransportConnection implements Connection, 
Task, CommandVisitor {
             }
             unregisterConnectionState(info.getConnectionId());
             LOG.warn("Failed to add Connection {} due to {}", 
info.getConnectionId(), e);
-            if (e instanceof SecurityException) {
-                // close this down - in case the peer of this transport 
doesn't play nice
-                delayedStop(2000, "Failed with SecurityException: " + 
e.getLocalizedMessage(), e);
-            }
+            //AMQ-6561 - stop for all exceptions on addConnection
+            // close this down - in case the peer of this transport doesn't 
play nice
+            delayedStop(2000, "Failed with SecurityException: " + 
e.getLocalizedMessage(), e);
             throw e;
         }
         if (info.isManageable()) {

http://git-wip-us.apache.org/repos/asf/activemq/blob/df7aa6e8/activemq-unit-tests/src/test/java/org/apache/activemq/transport/tcp/TcpTransportCloseSocketTest.java
----------------------------------------------------------------------
diff --git 
a/activemq-unit-tests/src/test/java/org/apache/activemq/transport/tcp/TcpTransportCloseSocketTest.java
 
b/activemq-unit-tests/src/test/java/org/apache/activemq/transport/tcp/TcpTransportCloseSocketTest.java
new file mode 100644
index 0000000..ad2080b
--- /dev/null
+++ 
b/activemq-unit-tests/src/test/java/org/apache/activemq/transport/tcp/TcpTransportCloseSocketTest.java
@@ -0,0 +1,133 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.transport.tcp;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
+
+import java.util.Arrays;
+import java.util.Collection;
+
+import org.apache.activemq.ActiveMQConnectionFactory;
+import org.apache.activemq.broker.BrokerService;
+import org.apache.activemq.broker.TransportConnector;
+import org.apache.activemq.util.Wait;
+import org.apache.activemq.util.Wait.Condition;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.junit.runners.Parameterized;
+import org.junit.runners.Parameterized.Parameters;
+
+/**
+ * Test for https://issues.apache.org/jira/browse/AMQ-6561 to make sure sockets
+ * are closed on all connection attempt errors
+ */
+@RunWith(Parameterized.class)
+public class TcpTransportCloseSocketTest {
+
+    public static final String KEYSTORE_TYPE = "jks";
+    public static final String PASSWORD = "password";
+    public static final String SERVER_KEYSTORE = 
"src/test/resources/server.keystore";
+    public static final String TRUST_KEYSTORE = 
"src/test/resources/client.keystore";
+
+    private String uri;
+    private final String protocol;
+    private BrokerService brokerService;
+
+    @Parameters(name="protocol={0}")
+    public static Collection<Object[]> data() {
+        return Arrays.asList(new Object[][] {
+                  {"auto+nio+ssl"},
+                  {"auto+ssl"},
+                  {"ssl"},
+                  {"tcp"}
+            });
+    }
+
+    static {
+        System.setProperty("javax.net.ssl.trustStore", TRUST_KEYSTORE);
+        System.setProperty("javax.net.ssl.trustStorePassword", PASSWORD);
+        System.setProperty("javax.net.ssl.trustStoreType", KEYSTORE_TYPE);
+        System.setProperty("javax.net.ssl.keyStore", SERVER_KEYSTORE);
+        System.setProperty("javax.net.ssl.keyStorePassword", PASSWORD);
+        System.setProperty("javax.net.ssl.keyStoreType", KEYSTORE_TYPE);
+    }
+
+    @Before
+    public void before() throws Exception {
+        BrokerService brokerService = new BrokerService();
+        brokerService.setPersistent(false);
+
+        TransportConnector connector = brokerService.addConnector(protocol + 
"://localhost:0");
+        connector.setName("tcp");
+        uri = connector.getPublishableConnectString();
+        this.brokerService = brokerService;
+        brokerService.start();
+        brokerService.waitUntilStarted();
+    }
+
+    @After
+    public void after() throws Exception {
+        if (brokerService != null) {
+            brokerService.stop();
+            brokerService.waitUntilStopped();
+        }
+    }
+
+    /**
+     * @param isNio
+     */
+    public TcpTransportCloseSocketTest(String protocol) {
+        this.protocol = protocol;
+    }
+
+    //We want to make sure that the socket will be closed if there as an error 
on broker.addConnection
+    //even if the client doesn't close the connection to prevent dangling open 
sockets
+    @Test(timeout = 60000)
+    public void testDuplicateClientIdCloseConnection() throws Exception {
+        ActiveMQConnectionFactory factory = new ActiveMQConnectionFactory();
+        factory.setBrokerURL(uri);
+        factory.setClientID("id");
+
+        TcpTransportServer server = (TcpTransportServer) 
brokerService.getTransportConnectorByName("tcp").getServer();
+
+        //Try and create 2 connections, the second should fail because of a 
duplicate clientId
+        int failed = 0;
+        for (int i = 0; i < 2; i++) {
+            try {
+                factory.createConnection().start();
+            } catch (Exception e) {
+                e.printStackTrace();
+                failed++;
+            }
+        }
+
+        assertEquals(1, failed);
+        //after 2 seconds the connection should be terminated by the broker 
because of the exception
+        //on broker.addConnection
+        assertTrue(Wait.waitFor(new Condition() {
+
+            @Override
+            public boolean isSatisified() throws Exception {
+                return server.getCurrentTransportCount().get() == 1;
+            }
+
+        }, 10000, 500));
+    }
+}

Reply via email to