This is an automated email from the ASF dual-hosted git repository.

cshannon pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/activemq.git


The following commit(s) were added to refs/heads/main by this push:
     new 67a2edb  [AMQ-8412] Update client-side maxFrameSize handling to be 
more symetrical with server-side
     new 43aa180  Merge pull request #756 from mattrpav/AMQ-8412c
67a2edb is described below

commit 67a2edbf0d7d3d43726a50743270b15b29759dec
Author: Matt Pavlovich <m...@hyte.io>
AuthorDate: Sat Feb 5 15:22:09 2022 -0600

    [AMQ-8412] Update client-side maxFrameSize handling to be more symetrical 
with server-side
    
     - Handle in the OpenWireFormat class
     - Add unit tests to confirm
     - Verify compression is accounted for
     - Verify the ability to disable using wireFormat.maxFrameSizeEnabled=false
     - [cshannon] Reworked max frame size test case to add in all transports 
and all client/server cases
---
 .../org/apache/activemq/ActiveMQConnection.java    |   6 -
 .../activemq/MaxFrameSizeExceededException.java    |  31 +++
 .../apache/activemq/openwire/OpenWireFormat.java   |  30 +-
 .../activemq/openwire/OpenWireFormatFactory.java   |  10 +
 .../activemq/transport/nio/NIOSSLTransport.java    |   9 +-
 .../activemq/transport/nio/NIOTransport.java       |   9 +-
 .../apache/activemq/util/IOExceptionSupport.java   |   4 +-
 .../apache/activemq/util/JMSExceptionSupport.java  |   8 +
 .../transport/MaxFrameSizeEnabledTest.java         | 302 +++++++++++++++++++++
 .../transport/nio/NIOMaxFrameSizeCleanupTest.java  |   8 +-
 10 files changed, 395 insertions(+), 22 deletions(-)

diff --git 
a/activemq-client/src/main/java/org/apache/activemq/ActiveMQConnection.java 
b/activemq-client/src/main/java/org/apache/activemq/ActiveMQConnection.java
index ef48e84..58d4197 100644
--- a/activemq-client/src/main/java/org/apache/activemq/ActiveMQConnection.java
+++ b/activemq-client/src/main/java/org/apache/activemq/ActiveMQConnection.java
@@ -1448,12 +1448,6 @@ public class ActiveMQConnection implements Connection, 
TopicConnection, QueueCon
         if (isClosed()) {
             throw new ConnectionClosedException();
         } else {
-            if(command.isMessage()) {
-                int tmpMsgSize = Message.class.cast(command).getSize();
-                if(maxFrameSize.get() < tmpMsgSize) {
-                    throw new JMSException("Message size: " +  tmpMsgSize + " 
exceeds maximum allowed by broker: " + maxFrameSize.get(), "41300");
-                }
-            }
             try {
                 Response response = (Response)(timeout > 0
                         ? this.transport.request(command, timeout)
diff --git 
a/activemq-client/src/main/java/org/apache/activemq/MaxFrameSizeExceededException.java
 
b/activemq-client/src/main/java/org/apache/activemq/MaxFrameSizeExceededException.java
new file mode 100644
index 0000000..a9d0705
--- /dev/null
+++ 
b/activemq-client/src/main/java/org/apache/activemq/MaxFrameSizeExceededException.java
@@ -0,0 +1,31 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq;
+
+import java.io.IOException;
+/**
+ * An exception thrown when max frame size is exceeded.
+ *
+ * 
+ */
+public class MaxFrameSizeExceededException extends IOException {
+    private static final long serialVersionUID = -7681404582227153308L;
+
+    public MaxFrameSizeExceededException(String message) {
+        super(message);
+    }
+}
diff --git 
a/activemq-client/src/main/java/org/apache/activemq/openwire/OpenWireFormat.java
 
b/activemq-client/src/main/java/org/apache/activemq/openwire/OpenWireFormat.java
index a28ea64..cdda5ac 100644
--- 
a/activemq-client/src/main/java/org/apache/activemq/openwire/OpenWireFormat.java
+++ 
b/activemq-client/src/main/java/org/apache/activemq/openwire/OpenWireFormat.java
@@ -55,6 +55,7 @@ public final class OpenWireFormat implements WireFormat {
     private boolean cacheEnabled;
     private boolean tightEncodingEnabled;
     private boolean sizePrefixDisabled;
+    private boolean maxFrameSizeEnabled = true;
     private long maxFrameSize = DEFAULT_MAX_FRAME_SIZE;
 
     // The following fields are used for value caching
@@ -80,7 +81,8 @@ public final class OpenWireFormat implements WireFormat {
         return version ^ (cacheEnabled ? 0x10000000 : 0x20000000)
                ^ (stackTraceEnabled ? 0x01000000 : 0x02000000)
                ^ (tightEncodingEnabled ? 0x00100000 : 0x00200000)
-               ^ (sizePrefixDisabled ? 0x00010000 : 0x00020000);
+               ^ (sizePrefixDisabled ? 0x00010000 : 0x00020000)
+               ^ (maxFrameSizeEnabled ? 0x00010000 : 0x00020000);
     }
 
     public OpenWireFormat copy() {
@@ -91,6 +93,7 @@ public final class OpenWireFormat implements WireFormat {
         answer.tightEncodingEnabled = tightEncodingEnabled;
         answer.sizePrefixDisabled = sizePrefixDisabled;
         answer.preferedWireFormatInfo = preferedWireFormatInfo;
+        answer.maxFrameSizeEnabled = maxFrameSizeEnabled;
         return answer;
     }
 
@@ -102,14 +105,15 @@ public final class OpenWireFormat implements WireFormat {
         OpenWireFormat o = (OpenWireFormat)object;
         return o.stackTraceEnabled == stackTraceEnabled && o.cacheEnabled == 
cacheEnabled
                && o.version == version && o.tightEncodingEnabled == 
tightEncodingEnabled
-               && o.sizePrefixDisabled == sizePrefixDisabled;
+               && o.sizePrefixDisabled == sizePrefixDisabled
+               && o.maxFrameSizeEnabled == maxFrameSizeEnabled;
     }
 
 
     @Override
     public String toString() {
         return "OpenWireFormat{version=" + version + ", cacheEnabled=" + 
cacheEnabled + ", stackTraceEnabled=" + stackTraceEnabled + ", 
tightEncodingEnabled="
-               + tightEncodingEnabled + ", sizePrefixDisabled=" + 
sizePrefixDisabled +  ", maxFrameSize=" + maxFrameSize + "}";
+               + tightEncodingEnabled + ", sizePrefixDisabled=" + 
sizePrefixDisabled +  ", maxFrameSize=" + maxFrameSize + ", 
maxFrameSizeEnabled=" + maxFrameSizeEnabled + "}";
         // return "OpenWireFormat{id="+id+",
         // tightEncodingEnabled="+tightEncodingEnabled+"}";
     }
@@ -142,6 +146,10 @@ public final class OpenWireFormat implements WireFormat {
                 size += dsm.tightMarshal1(this, c, bs);
                 size += bs.marshalledSize();
 
+                if(maxFrameSizeEnabled && size > maxFrameSize) {
+                    throw IOExceptionSupport.createFrameSizeException(size, 
maxFrameSize);
+                }
+
                 bytesOut.restart(size);
                 if (!sizePrefixDisabled) {
                     bytesOut.writeInt(size);
@@ -193,7 +201,7 @@ public final class OpenWireFormat implements WireFormat {
                 // size");
             }
 
-            if (size > maxFrameSize) {
+            if (maxFrameSizeEnabled && size > maxFrameSize) {
                 throw IOExceptionSupport.createFrameSizeException(size, 
maxFrameSize);
             }
         }
@@ -226,6 +234,10 @@ public final class OpenWireFormat implements WireFormat {
                 size += dsm.tightMarshal1(this, c, bs);
                 size += bs.marshalledSize();
 
+                if(maxFrameSizeEnabled && size > maxFrameSize) {
+                    throw IOExceptionSupport.createFrameSizeException(size, 
maxFrameSize);
+                }
+
                 if (!sizePrefixDisabled) {
                     dataOut.writeInt(size);
                 }
@@ -266,7 +278,7 @@ public final class OpenWireFormat implements WireFormat {
         DataInput dataIn = dis;
         if (!sizePrefixDisabled) {
             int size = dis.readInt();
-            if (size > maxFrameSize) {
+            if (maxFrameSizeEnabled && size > maxFrameSize) {
                 throw IOExceptionSupport.createFrameSizeException(size, 
maxFrameSize);
             }
             // int size = dis.readInt();
@@ -605,6 +617,14 @@ public final class OpenWireFormat implements WireFormat {
         this.maxFrameSize = maxFrameSize;
     }
 
+    public boolean isMaxFrameSizeEnabled() {
+        return maxFrameSizeEnabled;
+    }
+
+    public void setMaxFrameSizeEnabled(boolean maxFrameSizeEnabled) {
+        this.maxFrameSizeEnabled = maxFrameSizeEnabled;
+    }
+
     public void renegotiateWireFormat(WireFormatInfo info) throws IOException {
 
         if (preferedWireFormatInfo == null) {
diff --git 
a/activemq-client/src/main/java/org/apache/activemq/openwire/OpenWireFormatFactory.java
 
b/activemq-client/src/main/java/org/apache/activemq/openwire/OpenWireFormatFactory.java
index 2614ad7..2083639 100644
--- 
a/activemq-client/src/main/java/org/apache/activemq/openwire/OpenWireFormatFactory.java
+++ 
b/activemq-client/src/main/java/org/apache/activemq/openwire/OpenWireFormatFactory.java
@@ -41,6 +41,7 @@ public class OpenWireFormatFactory implements 
WireFormatFactory {
     private long maxInactivityDurationInitalDelay = 10*1000;
     private int cacheSize = 1024;
     private long maxFrameSize = OpenWireFormat.DEFAULT_MAX_FRAME_SIZE;
+    private boolean maxFrameSizeEnabled = true;
     private String host=null;
     private String providerName = ActiveMQConnectionMetaData.PROVIDER_NAME;
     private String providerVersion = 
ActiveMQConnectionMetaData.PROVIDER_VERSION;
@@ -80,6 +81,7 @@ public class OpenWireFormatFactory implements 
WireFormatFactory {
         OpenWireFormat f = new OpenWireFormat(version);
         f.setMaxFrameSize(maxFrameSize);
         f.setPreferedWireFormatInfo(info);
+        f.setMaxFrameSizeEnabled(maxFrameSizeEnabled);
         return f;
     }
 
@@ -203,4 +205,12 @@ public class OpenWireFormatFactory implements 
WireFormatFactory {
     public void setIncludePlatformDetails(boolean includePlatformDetails) {
         this.includePlatformDetails = includePlatformDetails;
     }
+
+    public void setMaxFrameSizeEnabled(boolean maxFrameSizeEnabled) {
+        this.maxFrameSizeEnabled = maxFrameSizeEnabled;
+    }
+
+    public boolean isMaxFrameSizeEnabled() {
+        return this.maxFrameSizeEnabled;
+    }
 }
diff --git 
a/activemq-client/src/main/java/org/apache/activemq/transport/nio/NIOSSLTransport.java
 
b/activemq-client/src/main/java/org/apache/activemq/transport/nio/NIOSSLTransport.java
index d0e2fc8..13b6c54 100644
--- 
a/activemq-client/src/main/java/org/apache/activemq/transport/nio/NIOSSLTransport.java
+++ 
b/activemq-client/src/main/java/org/apache/activemq/transport/nio/NIOSSLTransport.java
@@ -40,6 +40,7 @@ import javax.net.ssl.SSLParameters;
 import javax.net.ssl.SSLPeerUnverifiedException;
 import javax.net.ssl.SSLSession;
 
+import org.apache.activemq.MaxFrameSizeExceededException;
 import org.apache.activemq.command.ConnectionInfo;
 import org.apache.activemq.openwire.OpenWireFormat;
 import org.apache.activemq.thread.TaskRunnerFactory;
@@ -335,9 +336,11 @@ public class NIOSSLTransport extends NIOTransport {
             }
 
             if (wireFormat instanceof OpenWireFormat) {
-                long maxFrameSize = ((OpenWireFormat) 
wireFormat).getMaxFrameSize();
-                if (nextFrameSize > maxFrameSize) {
-                    throw new IOException("Frame size of " + (nextFrameSize / 
(1024 * 1024)) +
+                OpenWireFormat openWireFormat = (OpenWireFormat) wireFormat;
+                long maxFrameSize = openWireFormat.getMaxFrameSize();
+
+                if (openWireFormat.isMaxFrameSizeEnabled() && nextFrameSize > 
maxFrameSize) {
+                    throw new MaxFrameSizeExceededException("Frame size of " + 
(nextFrameSize / (1024 * 1024)) +
                                           " MB larger than max allowed " + 
(maxFrameSize / (1024 * 1024)) + " MB");
                 }
             }
diff --git 
a/activemq-client/src/main/java/org/apache/activemq/transport/nio/NIOTransport.java
 
b/activemq-client/src/main/java/org/apache/activemq/transport/nio/NIOTransport.java
index 7fe5bad..6109949 100644
--- 
a/activemq-client/src/main/java/org/apache/activemq/transport/nio/NIOTransport.java
+++ 
b/activemq-client/src/main/java/org/apache/activemq/transport/nio/NIOTransport.java
@@ -29,6 +29,7 @@ import java.nio.channels.SocketChannel;
 
 import javax.net.SocketFactory;
 
+import org.apache.activemq.MaxFrameSizeExceededException;
 import org.apache.activemq.openwire.OpenWireFormat;
 import org.apache.activemq.transport.Transport;
 import org.apache.activemq.transport.tcp.TcpTransport;
@@ -139,9 +140,11 @@ public class NIOTransport extends TcpTransport {
                     nextFrameSize = inputBuffer.getInt() + 4;
 
                     if (wireFormat instanceof OpenWireFormat) {
-                        long maxFrameSize = 
((OpenWireFormat)wireFormat).getMaxFrameSize();
-                        if (nextFrameSize > maxFrameSize) {
-                            throw new IOException("Frame size of " + 
(nextFrameSize / (1024 * 1024)) + " MB larger than max allowed " + 
(maxFrameSize / (1024 * 1024)) + " MB");
+                        OpenWireFormat openWireFormat = 
(OpenWireFormat)wireFormat;
+                        long maxFrameSize = openWireFormat.getMaxFrameSize();
+
+                        if (openWireFormat.isMaxFrameSizeEnabled() && 
nextFrameSize > maxFrameSize) {
+                            throw new MaxFrameSizeExceededException("Frame 
size of " + (nextFrameSize / (1024 * 1024)) + " MB larger than max allowed " + 
(maxFrameSize / (1024 * 1024)) + " MB");
                         }
                     }
 
diff --git 
a/activemq-client/src/main/java/org/apache/activemq/util/IOExceptionSupport.java
 
b/activemq-client/src/main/java/org/apache/activemq/util/IOExceptionSupport.java
index 7aa2fc4..0db3ce4 100644
--- 
a/activemq-client/src/main/java/org/apache/activemq/util/IOExceptionSupport.java
+++ 
b/activemq-client/src/main/java/org/apache/activemq/util/IOExceptionSupport.java
@@ -19,6 +19,8 @@ package org.apache.activemq.util;
 import java.io.IOException;
 import java.math.BigInteger;
 
+import org.apache.activemq.MaxFrameSizeExceededException;
+
 public final class IOExceptionSupport {
 
     private IOExceptionSupport() {
@@ -49,7 +51,7 @@ public final class IOExceptionSupport {
     }
 
     public static IOException createFrameSizeException(int size, long maxSize) 
{
-        return new IOException("Frame size of " + 
toHumanReadableSizeString(size) +
+        return new MaxFrameSizeExceededException("Frame size of " + 
toHumanReadableSizeString(size) +
             " larger than max allowed " + toHumanReadableSizeString(maxSize));
     }
 
diff --git 
a/activemq-client/src/main/java/org/apache/activemq/util/JMSExceptionSupport.java
 
b/activemq-client/src/main/java/org/apache/activemq/util/JMSExceptionSupport.java
index 5d55273..a73f01f 100644
--- 
a/activemq-client/src/main/java/org/apache/activemq/util/JMSExceptionSupport.java
+++ 
b/activemq-client/src/main/java/org/apache/activemq/util/JMSExceptionSupport.java
@@ -21,6 +21,8 @@ import javax.jms.JMSSecurityException;
 import javax.jms.MessageEOFException;
 import javax.jms.MessageFormatException;
 
+import org.apache.activemq.MaxFrameSizeExceededException;
+
 public final class JMSExceptionSupport {
 
     private JMSExceptionSupport() {
@@ -61,6 +63,12 @@ public final class JMSExceptionSupport {
         if (cause instanceof JMSException) {
             return (JMSException)cause;
         }
+        if (cause instanceof MaxFrameSizeExceededException) {
+            JMSException jmsException = new JMSException(cause.getMessage(), 
"41300");
+            jmsException.setLinkedException(cause);
+            jmsException.initCause(cause);
+            return jmsException;
+        }
         String msg = cause.getMessage();
         if (msg == null || msg.length() == 0) {
             msg = cause.toString();
diff --git 
a/activemq-unit-tests/src/test/java/org/apache/activemq/transport/MaxFrameSizeEnabledTest.java
 
b/activemq-unit-tests/src/test/java/org/apache/activemq/transport/MaxFrameSizeEnabledTest.java
new file mode 100644
index 0000000..d2ce82e
--- /dev/null
+++ 
b/activemq-unit-tests/src/test/java/org/apache/activemq/transport/MaxFrameSizeEnabledTest.java
@@ -0,0 +1,302 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.transport;
+
+import org.apache.activemq.ActiveMQConnectionFactory;
+import org.apache.activemq.MaxFrameSizeExceededException;
+import org.apache.activemq.broker.BrokerService;
+import org.apache.activemq.broker.TransportConnector;
+import org.apache.activemq.transport.nio.SelectorManager;
+import org.apache.activemq.util.Wait;
+import org.junit.After;
+import org.junit.BeforeClass;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.junit.runners.Parameterized;
+
+import javax.jms.Connection;
+import javax.jms.JMSException;
+import javax.jms.Message;
+import javax.jms.MessageConsumer;
+import javax.jms.MessageProducer;
+import javax.jms.Queue;
+import javax.jms.Session;
+import javax.jms.TextMessage;
+
+import java.io.IOException;
+import java.util.*;
+import java.util.concurrent.ThreadPoolExecutor;
+
+import static junit.framework.TestCase.assertTrue;
+import static org.junit.Assert.*;
+
+//Test for AMQ-8142
+@RunWith(value = Parameterized.class)
+public class MaxFrameSizeEnabledTest {
+
+    public static final String KEYSTORE_TYPE = "jks";
+    public static final String PASSWORD = "password";
+    public static final String SERVER_KEYSTORE = 
"src/test/resources/org/apache/activemq/security/broker1.ks";
+    public static final String TRUST_KEYSTORE = 
"src/test/resources/org/apache/activemq/security/broker1.ks";
+
+    private BrokerService broker;
+    private final String transportType;
+    private final boolean clientSideEnabled;
+    private final boolean serverSideEnabled;
+
+    public MaxFrameSizeEnabledTest(String transportType, boolean 
clientSideEnabled, boolean serverSideEnabled) {
+        this.transportType = transportType;
+        this.clientSideEnabled = clientSideEnabled;
+        this.serverSideEnabled = serverSideEnabled;
+    }
+
+    
@Parameterized.Parameters(name="transportType={0},clientSideEnable={1},serverSideEnabled={2}")
+    public static Collection<Object[]> data() {
+        return Arrays.asList(new Object[][] {
+                //Both client and server side max frame check enabled
+                {"tcp", true, true},
+                {"ssl", true, true},
+                {"nio", true, true},
+                {"nio+ssl", true, true},
+                {"auto", true, true},
+                {"auto+ssl", true, true},
+                {"auto+nio", true, true},
+                {"auto+nio+ssl", true, true},
+
+                //Client side enabled but server side disabled
+                {"tcp", true, false},
+                {"ssl", true, false},
+                {"nio", true, false},
+                {"nio+ssl", true, false},
+                {"auto", true, false},
+                {"auto+ssl", true, false},
+                {"auto+nio", true, false},
+                {"auto+nio+ssl", true, false},
+
+                //Client side disabled but server side enabled
+                {"tcp", false, true},
+                {"ssl", false, true},
+                {"nio", false, true},
+                {"nio+ssl", false, true},
+                {"auto", false, true},
+                {"auto+ssl", false, true},
+                {"auto+nio", false, true},
+                {"auto+nio+ssl", false, true},
+
+                //Client side and server side disabled
+                {"tcp", false, false},
+                {"ssl", false, false},
+                {"nio", false, false},
+                {"nio+ssl", false, false},
+                {"auto", false, false},
+                {"auto+ssl", false, false},
+                {"auto+nio", false, false},
+                {"auto+nio+ssl", false, false},
+        });
+    }
+
+    @BeforeClass
+    public static void beforeClass() throws Exception {
+        System.setProperty("javax.net.ssl.trustStore", TRUST_KEYSTORE);
+        System.setProperty("javax.net.ssl.trustStorePassword", PASSWORD);
+        System.setProperty("javax.net.ssl.trustStoreType", KEYSTORE_TYPE);
+        System.setProperty("javax.net.ssl.keyStore", SERVER_KEYSTORE);
+        System.setProperty("javax.net.ssl.keyStoreType", KEYSTORE_TYPE);
+        System.setProperty("javax.net.ssl.keyStorePassword", PASSWORD);
+    }
+
+    @After
+    public void after() throws Exception {
+        stopBroker(broker);
+    }
+
+    public BrokerService createBroker(String connectorName, String 
connectorString) throws Exception {
+        BrokerService broker = new BrokerService();
+        broker.setPersistent(false);
+        broker.setUseJmx(false);
+        TransportConnector connector = broker.addConnector(connectorString);
+        connector.setName(connectorName);
+        broker.start();
+        broker.waitUntilStarted();
+        return broker;
+    }
+
+    public void stopBroker(BrokerService broker) throws Exception {
+        if (broker != null) {
+            broker.stop();
+            broker.waitUntilStopped();
+        }
+    }
+
+    @Test
+    public void testMaxFrameSize() throws Exception {
+        broker = createBroker(transportType, transportType + 
"://localhost:0?wireFormat.maxFrameSize=2048" + getServerParams());
+        testMaxFrameSize(transportType, (isSsl() ? "ssl" : "tcp") + 
"://localhost:" + 
broker.getConnectorByName(transportType).getConnectUri().getPort() +
+                getClientParams(), false);
+    }
+
+    @Test
+    public void testMaxFrameSizeCompression() throws Exception {
+        // Test message body length is 99841 bytes. Compresses to ~ 48000
+        broker = createBroker(transportType, transportType + 
"://localhost:0?wireFormat.maxFrameSize=60000" + getServerParams());
+        testMaxFrameSize(transportType, (isSsl() ? "ssl" : "tcp") + 
"://localhost:" + 
broker.getConnectorByName(transportType).getConnectUri().getPort()
+                + getClientParams(), true);
+    }
+
+    protected void testMaxFrameSize(String transportType, String clientUri, 
boolean useCompression) throws Exception {
+        final List<Connection> connections = new ArrayList<>();
+        final ActiveMQConnectionFactory factory = new 
ActiveMQConnectionFactory(clientUri);
+        factory.setUseCompression(useCompression);
+
+        for (int i = 0; i < 10; i++) {
+            Connection connection = factory.createConnection();
+            connection.start();
+            connections.add(connection);
+        }
+
+        //Generate a body that is too large
+        StringBuffer body = new StringBuffer();
+        Random r = new Random();
+        for (int i = 0; i < 10000; i++) {
+            body.append(r.nextInt());
+        }
+
+        //Try sending 10 large messages rapidly in a loop to make sure all
+        //nio threads are allowed to send again and do not close server-side
+        for (int i = 0; i < 10; i++) {
+            boolean maxFrameSizeException = false;
+            boolean otherException = false;
+
+            Connection connection = null;
+            Session session = null;
+            Queue destination = null;
+            MessageConsumer messageConsumer = null;
+            MessageProducer producer = null;
+
+            try {
+                connection = connections.get(i);
+                session = connection.createSession(false, 
Session.AUTO_ACKNOWLEDGE);
+                destination = session.createQueue("TEST");
+                producer = session.createProducer(destination);
+                producer.send(session.createTextMessage(body.toString()));
+            } catch (JMSException e) {
+                if (clientSideEnabled) {
+                    assertNotNull(e.getErrorCode());
+                    assertEquals("41300", e.getErrorCode());
+                    assertTrue(e.getCause() instanceof 
MaxFrameSizeExceededException);
+                } else {
+                    assertTrue(e.getCause() instanceof IOException);
+                }
+                assertNotNull(e.getCause());
+                maxFrameSizeException = true;
+            } catch (Exception e) {
+                otherException = true;
+            }
+
+            if (maxFrameSizeEnabled() && !useCompression) {
+                assertTrue("Should have gotten a jms maxframesize exception", 
maxFrameSizeException);
+                assertFalse("Should not have gotten a transport exception", 
otherException);
+            } else {
+                assertFalse("Should not have gotten a jms maxframesize 
exception", maxFrameSizeException);
+            }
+
+            if (!maxFrameSizeEnabled() && otherException) {
+                fail("Should not have gotten exception");
+            }
+
+            assertNotNull(connection);
+            assertNotNull(session);
+            assertNotNull(destination);
+            assertNotNull(producer);
+
+            if (connectionsShouldBeOpen(useCompression)) {
+                // Validate we can send a valid sized message after sending a 
too-large of message
+                boolean nextException = false;
+                try {
+
+                    messageConsumer = session.createConsumer(destination);
+                    producer.send(session.createTextMessage("Hello"));
+
+                    int maxLoops = 50;
+                    boolean found = false;
+                    do {
+                        Message message = messageConsumer.receive(200l);
+                        if (message != null) {
+                            
assertTrue(TextMessage.class.isAssignableFrom(message.getClass()));
+                            
TextMessage.class.cast(message).getText().equals("Hello");
+                            found = true;
+                        }
+                        maxLoops++;
+                    } while (!found && maxLoops <= 50);
+
+                } catch (Exception e) {
+                    nextException = true;
+                }
+                assertFalse("Should not have gotten an exception for the next 
message", nextException);
+            }
+        }
+
+
+        if (connectionsShouldBeOpen(useCompression)) {
+            //Verify that all connections are active
+            assertTrue(Wait.waitFor(() -> 
broker.getConnectorByName(transportType).getConnections().size() == 10,
+                    3000, 500));
+        } else {
+            //Verify that all connections are closed
+            assertTrue(Wait.waitFor(() -> 
broker.getConnectorByName(transportType).getConnections().size() == 0,
+                    3000, 500));
+        }
+
+        if (isNio() && connectionsShouldBeOpen(useCompression)) {
+            //Verify one active transport connections in the selector thread 
pool.
+            final ThreadPoolExecutor e = (ThreadPoolExecutor) 
SelectorManager.getInstance().getSelectorExecutor();
+            assertTrue(Wait.waitFor(() -> e.getActiveCount() == 1, 3000, 500));
+        }
+    }
+
+    private boolean maxFrameSizeEnabled() {
+        return clientSideEnabled || serverSideEnabled;
+    }
+
+    private boolean connectionsShouldBeOpen(boolean useCompression) {
+        return !maxFrameSizeEnabled() || clientSideEnabled || useCompression;
+    }
+
+    private boolean isSsl() {
+        return transportType.contains("ssl");
+    }
+
+    private boolean isNio() {
+        return transportType.contains("nio");
+    }
+
+    private String getServerParams() {
+        if (serverSideEnabled) {
+            return isSsl() ? "&transport.needClientAuth=true" : "";
+        } else {
+            return isSsl() ? 
"&transport.needClientAuth=true&wireFormat.maxFrameSizeEnabled=false" : 
"&wireFormat.maxFrameSizeEnabled=false";
+        }
+    }
+
+    private String getClientParams() {
+        if (clientSideEnabled) {
+            return isSsl() ? "?socket.verifyHostName=false" : "";
+        } else {
+            return isSsl() ? 
"?socket.verifyHostName=false&wireFormat.maxFrameSizeEnabled=false" : 
"?wireFormat.maxFrameSizeEnabled=false";
+        }
+    }
+}
diff --git 
a/activemq-unit-tests/src/test/java/org/apache/activemq/transport/nio/NIOMaxFrameSizeCleanupTest.java
 
b/activemq-unit-tests/src/test/java/org/apache/activemq/transport/nio/NIOMaxFrameSizeCleanupTest.java
index df2e691..83a6939 100644
--- 
a/activemq-unit-tests/src/test/java/org/apache/activemq/transport/nio/NIOMaxFrameSizeCleanupTest.java
+++ 
b/activemq-unit-tests/src/test/java/org/apache/activemq/transport/nio/NIOMaxFrameSizeCleanupTest.java
@@ -82,14 +82,14 @@ public class NIOMaxFrameSizeCleanupTest {
     public void testMaxFrameSizeCleanupNio() throws Exception {
         String transportType = "nio";
         broker = createBroker(transportType, transportType + 
"://localhost:0?wireFormat.maxFrameSize=1024");
-        testMaxFrameSizeCleanup(transportType, "tcp://localhost:" + 
broker.getConnectorByName(transportType).getConnectUri().getPort());
+        testMaxFrameSizeCleanup(transportType, "tcp://localhost:" + 
broker.getConnectorByName(transportType).getConnectUri().getPort() + 
"?wireFormat.maxFrameSizeEnabled=false");
     }
 
     @Test
     public void testMaxFrameSizeCleanupAutoNio() throws Exception {
         String transportType = "auto+nio";
         broker = createBroker(transportType, transportType + 
"://localhost:0?wireFormat.maxFrameSize=1024");
-        testMaxFrameSizeCleanup(transportType, "tcp://localhost:" + 
broker.getConnectorByName(transportType).getConnectUri().getPort());
+        testMaxFrameSizeCleanup(transportType, "tcp://localhost:" + 
broker.getConnectorByName(transportType).getConnectUri().getPort() + 
"?wireFormat.maxFrameSizeEnabled=false");
     }
 
     @Test
@@ -98,7 +98,7 @@ public class NIOMaxFrameSizeCleanupTest {
         broker = createBroker(transportType, transportType +
                 
"://localhost:0?transport.needClientAuth=true&wireFormat.maxFrameSize=1024");
         testMaxFrameSizeCleanup(transportType, "ssl://localhost:" + 
broker.getConnectorByName(transportType).getConnectUri().getPort()
-                + "?socket.verifyHostName=false");
+                + 
"?socket.verifyHostName=false&wireFormat.maxFrameSizeEnabled=false");
     }
 
     @Test
@@ -107,7 +107,7 @@ public class NIOMaxFrameSizeCleanupTest {
         broker = createBroker(transportType, transportType +
                 
"://localhost:0?transport.needClientAuth=true&wireFormat.maxFrameSize=1024");
         testMaxFrameSizeCleanup(transportType, "ssl://localhost:" + 
broker.getConnectorByName(transportType).getConnectUri().getPort()
-                + "?socket.verifyHostName=false");
+                + 
"?socket.verifyHostName=false&wireFormat.maxFrameSizeEnabled=false");
     }
 
     protected void testMaxFrameSizeCleanup(String transportType, String 
clientUri) throws Exception {

Reply via email to