This is an automated email from the ASF dual-hosted git repository.

cshannon pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/activemq.git


The following commit(s) were added to refs/heads/master by this push:
     new 944ca6c  AMQ-8183 - prevent infinite loop when maxFrameSize is exceeded
944ca6c is described below

commit 944ca6c7e1dc4f44b61ce05b77c180c686e2ae76
Author: Christopher L. Shannon (cshannon) <christopher.l.shan...@gmail.com>
AuthorDate: Thu Mar 11 09:59:36 2021 -0500

    AMQ-8183 - prevent infinite loop when maxFrameSize is exceeded
    
    This makes sure the nio transport thread properly terminates if
    maxFrameSize is exceeded with OpenWire to prevent an infinite loop that
    uses up all the cpu
---
 .../activemq/transport/nio/NIOSSLTransport.java    |   5 +
 .../activemq/transport/nio/NIOTransport.java       |   4 +
 .../transport/nio/NIOMaxFrameSizeCleanupTest.java  | 154 +++++++++++++++++++++
 3 files changed, 163 insertions(+)

diff --git 
a/activemq-client/src/main/java/org/apache/activemq/transport/nio/NIOSSLTransport.java
 
b/activemq-client/src/main/java/org/apache/activemq/transport/nio/NIOSSLTransport.java
index 3bcb0e4..4c944f4 100644
--- 
a/activemq-client/src/main/java/org/apache/activemq/transport/nio/NIOSSLTransport.java
+++ 
b/activemq-client/src/main/java/org/apache/activemq/transport/nio/NIOSSLTransport.java
@@ -257,6 +257,11 @@ public class NIOSSLTransport extends NIOTransport {
             plain.position(plain.limit());
 
             while (true) {
+                //If the transport was already stopped then break
+                if (this.isStopped()) {
+                    return;
+                }
+
                 if (!plain.hasRemaining()) {
 
                     int readCount = secureRead(plain);
diff --git 
a/activemq-client/src/main/java/org/apache/activemq/transport/nio/NIOTransport.java
 
b/activemq-client/src/main/java/org/apache/activemq/transport/nio/NIOTransport.java
index 58ee1aa..7fe5bad 100644
--- 
a/activemq-client/src/main/java/org/apache/activemq/transport/nio/NIOTransport.java
+++ 
b/activemq-client/src/main/java/org/apache/activemq/transport/nio/NIOTransport.java
@@ -108,6 +108,10 @@ public class NIOTransport extends TcpTransport {
     protected void serviceRead() {
         try {
             while (true) {
+                //If the transport was already stopped then break
+                if (this.isStopped()) {
+                    return;
+                }
 
                 int readSize = readFromBuffer();
                 if (readSize == -1) {
diff --git 
a/activemq-unit-tests/src/test/java/org/apache/activemq/transport/nio/NIOMaxFrameSizeCleanupTest.java
 
b/activemq-unit-tests/src/test/java/org/apache/activemq/transport/nio/NIOMaxFrameSizeCleanupTest.java
new file mode 100644
index 0000000..df2e691
--- /dev/null
+++ 
b/activemq-unit-tests/src/test/java/org/apache/activemq/transport/nio/NIOMaxFrameSizeCleanupTest.java
@@ -0,0 +1,154 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.transport.nio;
+
+import org.apache.activemq.ActiveMQConnectionFactory;
+import org.apache.activemq.broker.BrokerService;
+import org.apache.activemq.broker.TransportConnector;
+import org.apache.activemq.util.Wait;
+import org.junit.After;
+import org.junit.BeforeClass;
+import org.junit.Test;
+
+import javax.jms.Connection;
+import javax.jms.MessageProducer;
+import javax.jms.Queue;
+import javax.jms.Session;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Random;
+import java.util.concurrent.ThreadPoolExecutor;
+
+import static junit.framework.TestCase.assertTrue;
+
+//Test for AMQ-8183
+public class NIOMaxFrameSizeCleanupTest {
+
+    public static final String KEYSTORE_TYPE = "jks";
+    public static final String PASSWORD = "password";
+    public static final String SERVER_KEYSTORE = 
"src/test/resources/org/apache/activemq/security/broker1.ks";
+    public static final String TRUST_KEYSTORE = 
"src/test/resources/org/apache/activemq/security/broker1.ks";
+
+    private BrokerService broker;
+
+    @BeforeClass
+    public static void beforeClass() throws Exception {
+        System.setProperty("javax.net.ssl.trustStore", TRUST_KEYSTORE);
+        System.setProperty("javax.net.ssl.trustStorePassword", PASSWORD);
+        System.setProperty("javax.net.ssl.trustStoreType", KEYSTORE_TYPE);
+        System.setProperty("javax.net.ssl.keyStore", SERVER_KEYSTORE);
+        System.setProperty("javax.net.ssl.keyStoreType", KEYSTORE_TYPE);
+        System.setProperty("javax.net.ssl.keyStorePassword", PASSWORD);
+    }
+
+    @After
+    public void after() throws Exception {
+        stopBroker(broker);
+    }
+
+    public BrokerService createBroker(String connectorName, String 
connectorString) throws Exception {
+        BrokerService broker = new BrokerService();
+        broker.setPersistent(false);
+        broker.setUseJmx(false);
+        TransportConnector connector = broker.addConnector(connectorString);
+        connector.setName(connectorName);
+        broker.start();
+        broker.waitUntilStarted();
+        return broker;
+    }
+
+    public void stopBroker(BrokerService broker) throws Exception {
+        if (broker != null) {
+            broker.stop();
+            broker.waitUntilStopped();
+        }
+    }
+
+    @Test
+    public void testMaxFrameSizeCleanupNio() throws Exception {
+        String transportType = "nio";
+        broker = createBroker(transportType, transportType + 
"://localhost:0?wireFormat.maxFrameSize=1024");
+        testMaxFrameSizeCleanup(transportType, "tcp://localhost:" + 
broker.getConnectorByName(transportType).getConnectUri().getPort());
+    }
+
+    @Test
+    public void testMaxFrameSizeCleanupAutoNio() throws Exception {
+        String transportType = "auto+nio";
+        broker = createBroker(transportType, transportType + 
"://localhost:0?wireFormat.maxFrameSize=1024");
+        testMaxFrameSizeCleanup(transportType, "tcp://localhost:" + 
broker.getConnectorByName(transportType).getConnectUri().getPort());
+    }
+
+    @Test
+    public void testMaxFrameSizeCleanupNioSsl() throws Exception {
+        String transportType = "nio+ssl";
+        broker = createBroker(transportType, transportType +
+                
"://localhost:0?transport.needClientAuth=true&wireFormat.maxFrameSize=1024");
+        testMaxFrameSizeCleanup(transportType, "ssl://localhost:" + 
broker.getConnectorByName(transportType).getConnectUri().getPort()
+                + "?socket.verifyHostName=false");
+    }
+
+    @Test
+    public void testMaxFrameSizeCleanupAutoNioSsl() throws Exception {
+        String transportType = "auto+nio+ssl";
+        broker = createBroker(transportType, transportType +
+                
"://localhost:0?transport.needClientAuth=true&wireFormat.maxFrameSize=1024");
+        testMaxFrameSizeCleanup(transportType, "ssl://localhost:" + 
broker.getConnectorByName(transportType).getConnectUri().getPort()
+                + "?socket.verifyHostName=false");
+    }
+
+    protected void testMaxFrameSizeCleanup(String transportType, String 
clientUri) throws Exception {
+        final List<Connection> connections = new ArrayList<>();
+        final ActiveMQConnectionFactory factory = new 
ActiveMQConnectionFactory(clientUri);
+        for (int i = 0; i < 10; i++) {
+            Connection connection = factory.createConnection();
+            connection.start();
+            connections.add(connection);
+        }
+
+        //Generate a body that is too large
+        StringBuffer body = new StringBuffer();
+        Random r = new Random();
+        for (int i = 0; i < 10000; i++) {
+            body.append(r.nextInt());
+        }
+
+        //Try sending 10 large messages rapidly in a loop to make sure all
+        //nio threads are properly terminated
+        for (int i = 0; i < 10; i++) {
+            boolean exception = false;
+            try {
+                Connection connection = connections.get(i);
+                Session session = connection.createSession(false, 
Session.AUTO_ACKNOWLEDGE);
+                Queue destination = session.createQueue("TEST");
+                MessageProducer producer = session.createProducer(destination);
+                producer.send(session.createTextMessage(body.toString()));
+            } catch (Exception e) {
+                //expected
+                exception = true;
+            }
+            assertTrue("Should have gotten a transport exception", exception);
+        }
+
+        final ThreadPoolExecutor e = (ThreadPoolExecutor) 
SelectorManager.getInstance().getSelectorExecutor();
+        //Verify that all connections are removed
+        assertTrue(Wait.waitFor(() -> 
broker.getConnectorByName(transportType).getConnections().size() == 0,
+                5000, 500));
+        //Verify no more active transport connections in the selector thread 
pool. This was broken
+        //due to AMQ-7106 before the fix in AMQ-8183
+        assertTrue(Wait.waitFor(() -> e.getActiveCount() == 0, 5000, 500));
+    }
+}

Reply via email to