This is an automated email from the ASF dual-hosted git repository. clebertsuconic pushed a commit to branch main in repository https://gitbox.apache.org/repos/asf/activemq-artemis.git
The following commit(s) were added to refs/heads/main by this push: new f56595b89b ARTEMIS-4185 - Revision on sending already compressed messages f56595b89b is described below commit f56595b89b5b32ecd928a2c0d2c76410879daaed Author: a181321 <anton.roskv...@volvo.com> AuthorDate: Wed Nov 22 10:19:14 2023 +0100 ARTEMIS-4185 - Revision on sending already compressed messages --- .../core/client/impl/ClientConsumerImpl.java | 2 + .../core/client/impl/ClientProducerImpl.java | 4 -- .../client/LargeMessageCompressTest.java | 74 ++++++++++++---------- 3 files changed, 43 insertions(+), 37 deletions(-) diff --git a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/client/impl/ClientConsumerImpl.java b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/client/impl/ClientConsumerImpl.java index 75b5621843..b8a7b86546 100644 --- a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/client/impl/ClientConsumerImpl.java +++ b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/client/impl/ClientConsumerImpl.java @@ -646,6 +646,7 @@ public final class ClientConsumerImpl implements ClientConsumerInternal { qbuff.readBytes(body); largeMessage.setLargeMessageController(new CompressedLargeMessageControllerImpl(currentLargeMessageController)); currentLargeMessageController.addPacket(body, body.length, false); + largeMessage.putBooleanProperty(Message.HDR_LARGE_COMPRESSED, false); handleRegularMessage(largeMessage); } @@ -674,6 +675,7 @@ public final class ClientConsumerImpl implements ClientConsumerInternal { if (clientLargeMessage.isCompressed()) { clientLargeMessage.setLargeMessageController(new CompressedLargeMessageControllerImpl(currentLargeMessageController)); + clientLargeMessage.putBooleanProperty(Message.HDR_LARGE_COMPRESSED, false); } else { clientLargeMessage.setLargeMessageController(currentLargeMessageController); } diff --git a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/client/impl/ClientProducerImpl.java b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/client/impl/ClientProducerImpl.java index c0a238bf2a..f3b01a7eca 100644 --- a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/client/impl/ClientProducerImpl.java +++ b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/client/impl/ClientProducerImpl.java @@ -443,10 +443,6 @@ public class ClientProducerImpl implements ClientProducerInternal { deflaterReader = new DeflaterReader(inputStreamParameter, messageSize); deflaterReader.setLevel(session.getCompressionLevel()); input = deflaterReader; - } else if (msgI.getBooleanProperty(Message.HDR_LARGE_COMPRESSED)) { - //This needs to be false if we do not intend to compress the message - //and the header already exists - msgI.putBooleanProperty(Message.HDR_LARGE_COMPRESSED, false); } long totalSize = 0; diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/client/LargeMessageCompressTest.java b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/client/LargeMessageCompressTest.java index 53661c7a8e..d85ce0a50b 100644 --- a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/client/LargeMessageCompressTest.java +++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/client/LargeMessageCompressTest.java @@ -29,6 +29,7 @@ import java.util.concurrent.atomic.AtomicLong; import java.util.zip.Deflater; import io.netty.util.internal.PlatformDependent; +import org.apache.activemq.artemis.api.core.ICoreMessage; import org.apache.activemq.artemis.api.core.Message; import org.apache.activemq.artemis.api.core.QueueConfiguration; import org.apache.activemq.artemis.api.core.RoutingType; @@ -518,52 +519,59 @@ public class LargeMessageCompressTest extends LargeMessageTest { public void testPreviouslyCompressedMessageCleanup() throws Exception { final int messageSize = 1024 * 1024; + byte[] payload = new byte[messageSize]; + byte[] response = new byte[messageSize]; + ActiveMQServer server = createServer(true, isNetty()); server.start(); - ClientSessionFactory sf1 = createSessionFactory(locator); - ClientSession session1 = addClientSession(sf1.createSession(false, true, true)); - session1.createQueue(new QueueConfiguration(ADDRESS).setRoutingType(RoutingType.ANYCAST)); - ClientProducer producer = session1.createProducer(ADDRESS); - - ServerLocator locator2 = ActiveMQClient.createServerLocator("vm://0"); - locator2.setCompressLargeMessage(false); - ClientSessionFactory sf2 = locator2.createSessionFactory(); - ClientSession session2 = sf2.createSession(false, true, true); - ClientConsumer consumer = session2.createConsumer(ADDRESS); - ClientProducer producer2 = session2.createProducer(ADDRESS); - session2.start(); - - byte[] payload = new byte[messageSize]; - byte[] response = new byte[messageSize]; + server.createQueue(new QueueConfiguration(ADDRESS).setRoutingType(RoutingType.ANYCAST)); for (int i = 0; i < payload.length; i++) { payload[i] = RandomUtil.randomByte(); } - ClientMessage message = session1.createMessage(true); - message.getBodyBuffer().writeBytes(payload); - producer.send(message); + try (ClientSessionFactory sf = locator.createSessionFactory(); + ClientSession session = sf.createSession(true, true); + ClientProducer producer = session.createProducer(ADDRESS)) { - message = consumer.receive(); - assertTrue(message.getBooleanProperty(Message.HDR_LARGE_COMPRESSED)); + ClientMessage message = session.createMessage(true); + message.getBodyBuffer().writeBytes(payload); + assertNull(message.getAnnotation(Message.HDR_LARGE_COMPRESSED)); - message.getBodyBuffer().readBytes(response); - message.getBodyBuffer().writeBytes(response); - producer2.send(message); + producer.send(message); + } - message = consumer.receive(); - assertFalse(message.getBooleanProperty(Message.HDR_LARGE_COMPRESSED)); + ServerLocator locator2 = createFactory(isNetty()); + locator2.setCompressLargeMessage(false); + locator2.setMinLargeMessageSize(1024); - message.getBodyBuffer().readBytes(payload); - message.getBodySize(); - assertTrue(Arrays.equals(payload, response)); + try (ClientSessionFactory sf = locator2.createSessionFactory(); + ClientSession session = sf.createSession(true, true); + ClientConsumer consumer = session.createConsumer(ADDRESS); + ClientProducer producer = session.createProducer(ADDRESS)) { + + ClientMessage message = session.createMessage(true); + ICoreMessage serverMessage = server.locateQueue(ADDRESS).browserIterator().next().getMessage().copy().toCore(); + + message.moveHeadersAndProperties(serverMessage); + message.getBodyBuffer().writeBytes(serverMessage.getReadOnlyBodyBuffer(), serverMessage.getBodyBufferSize()); + + assertTrue(message.getBooleanProperty(Message.HDR_LARGE_COMPRESSED)); + + producer.send(message); + session.start(); + + for (int i = 0; i < 2; i++) { + message = consumer.receive(2000); + assertNotNull(message); + + message.getBodyBuffer().readBytes(response); + assertTrue(Arrays.equals(payload, response)); + message.acknowledge(); + } + } - session1.close(); - session2.close(); - sf1.close(); - locator.close(); - sf2.close(); locator2.close(); }