This is an automated email from the ASF dual-hosted git repository.

clebertsuconic pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/activemq-artemis.git


The following commit(s) were added to refs/heads/main by this push:
     new f56595b89b ARTEMIS-4185 - Revision on sending already compressed 
messages
f56595b89b is described below

commit f56595b89b5b32ecd928a2c0d2c76410879daaed
Author: a181321 <anton.roskv...@volvo.com>
AuthorDate: Wed Nov 22 10:19:14 2023 +0100

    ARTEMIS-4185 - Revision on sending already compressed messages
---
 .../core/client/impl/ClientConsumerImpl.java       |  2 +
 .../core/client/impl/ClientProducerImpl.java       |  4 --
 .../client/LargeMessageCompressTest.java           | 74 ++++++++++++----------
 3 files changed, 43 insertions(+), 37 deletions(-)

diff --git 
a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/client/impl/ClientConsumerImpl.java
 
b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/client/impl/ClientConsumerImpl.java
index 75b5621843..b8a7b86546 100644
--- 
a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/client/impl/ClientConsumerImpl.java
+++ 
b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/client/impl/ClientConsumerImpl.java
@@ -646,6 +646,7 @@ public final class ClientConsumerImpl implements 
ClientConsumerInternal {
       qbuff.readBytes(body);
       largeMessage.setLargeMessageController(new 
CompressedLargeMessageControllerImpl(currentLargeMessageController));
       currentLargeMessageController.addPacket(body, body.length, false);
+      largeMessage.putBooleanProperty(Message.HDR_LARGE_COMPRESSED, false);
 
       handleRegularMessage(largeMessage);
    }
@@ -674,6 +675,7 @@ public final class ClientConsumerImpl implements 
ClientConsumerInternal {
 
       if (clientLargeMessage.isCompressed()) {
          clientLargeMessage.setLargeMessageController(new 
CompressedLargeMessageControllerImpl(currentLargeMessageController));
+         clientLargeMessage.putBooleanProperty(Message.HDR_LARGE_COMPRESSED, 
false);
       } else {
          
clientLargeMessage.setLargeMessageController(currentLargeMessageController);
       }
diff --git 
a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/client/impl/ClientProducerImpl.java
 
b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/client/impl/ClientProducerImpl.java
index c0a238bf2a..f3b01a7eca 100644
--- 
a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/client/impl/ClientProducerImpl.java
+++ 
b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/client/impl/ClientProducerImpl.java
@@ -443,10 +443,6 @@ public class ClientProducerImpl implements 
ClientProducerInternal {
          deflaterReader = new DeflaterReader(inputStreamParameter, 
messageSize);
          deflaterReader.setLevel(session.getCompressionLevel());
          input = deflaterReader;
-      } else if (msgI.getBooleanProperty(Message.HDR_LARGE_COMPRESSED)) {
-         //This needs to be false if we do not intend to compress the message
-         //and the header already exists
-         msgI.putBooleanProperty(Message.HDR_LARGE_COMPRESSED, false);
       }
 
       long totalSize = 0;
diff --git 
a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/client/LargeMessageCompressTest.java
 
b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/client/LargeMessageCompressTest.java
index 53661c7a8e..d85ce0a50b 100644
--- 
a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/client/LargeMessageCompressTest.java
+++ 
b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/client/LargeMessageCompressTest.java
@@ -29,6 +29,7 @@ import java.util.concurrent.atomic.AtomicLong;
 import java.util.zip.Deflater;
 
 import io.netty.util.internal.PlatformDependent;
+import org.apache.activemq.artemis.api.core.ICoreMessage;
 import org.apache.activemq.artemis.api.core.Message;
 import org.apache.activemq.artemis.api.core.QueueConfiguration;
 import org.apache.activemq.artemis.api.core.RoutingType;
@@ -518,52 +519,59 @@ public class LargeMessageCompressTest extends 
LargeMessageTest {
    public void testPreviouslyCompressedMessageCleanup() throws Exception {
       final int messageSize = 1024 * 1024;
 
+      byte[] payload = new byte[messageSize];
+      byte[] response = new byte[messageSize];
+
       ActiveMQServer server = createServer(true, isNetty());
       server.start();
 
-      ClientSessionFactory sf1 = createSessionFactory(locator);
-      ClientSession session1 = addClientSession(sf1.createSession(false, true, 
true));
-      session1.createQueue(new 
QueueConfiguration(ADDRESS).setRoutingType(RoutingType.ANYCAST));
-      ClientProducer producer = session1.createProducer(ADDRESS);
-
-      ServerLocator locator2 = ActiveMQClient.createServerLocator("vm://0");
-      locator2.setCompressLargeMessage(false);
-      ClientSessionFactory sf2 = locator2.createSessionFactory();
-      ClientSession session2 = sf2.createSession(false, true, true);
-      ClientConsumer consumer = session2.createConsumer(ADDRESS);
-      ClientProducer producer2 = session2.createProducer(ADDRESS);
-      session2.start();
-
-      byte[] payload = new byte[messageSize];
-      byte[] response = new byte[messageSize];
+      server.createQueue(new 
QueueConfiguration(ADDRESS).setRoutingType(RoutingType.ANYCAST));
 
       for (int i = 0; i < payload.length; i++) {
          payload[i] = RandomUtil.randomByte();
       }
 
-      ClientMessage message = session1.createMessage(true);
-      message.getBodyBuffer().writeBytes(payload);
-      producer.send(message);
+      try (ClientSessionFactory sf = locator.createSessionFactory();
+           ClientSession session = sf.createSession(true, true);
+           ClientProducer producer = session.createProducer(ADDRESS)) {
 
-      message = consumer.receive();
-      assertTrue(message.getBooleanProperty(Message.HDR_LARGE_COMPRESSED));
+         ClientMessage message = session.createMessage(true);
+         message.getBodyBuffer().writeBytes(payload);
+         assertNull(message.getAnnotation(Message.HDR_LARGE_COMPRESSED));
 
-      message.getBodyBuffer().readBytes(response);
-      message.getBodyBuffer().writeBytes(response);
-      producer2.send(message);
+         producer.send(message);
+      }
 
-      message = consumer.receive();
-      assertFalse(message.getBooleanProperty(Message.HDR_LARGE_COMPRESSED));
+      ServerLocator locator2 = createFactory(isNetty());
+      locator2.setCompressLargeMessage(false);
+      locator2.setMinLargeMessageSize(1024);
 
-      message.getBodyBuffer().readBytes(payload);
-      message.getBodySize();
-      assertTrue(Arrays.equals(payload, response));
+      try (ClientSessionFactory sf = locator2.createSessionFactory();
+           ClientSession session = sf.createSession(true, true);
+           ClientConsumer consumer = session.createConsumer(ADDRESS);
+           ClientProducer producer = session.createProducer(ADDRESS)) {
+
+         ClientMessage message = session.createMessage(true);
+         ICoreMessage serverMessage = 
server.locateQueue(ADDRESS).browserIterator().next().getMessage().copy().toCore();
+
+         message.moveHeadersAndProperties(serverMessage);
+         
message.getBodyBuffer().writeBytes(serverMessage.getReadOnlyBodyBuffer(), 
serverMessage.getBodyBufferSize());
+
+         assertTrue(message.getBooleanProperty(Message.HDR_LARGE_COMPRESSED));
+
+         producer.send(message);
+         session.start();
+
+         for (int i = 0; i < 2; i++) {
+            message = consumer.receive(2000);
+            assertNotNull(message);
+
+            message.getBodyBuffer().readBytes(response);
+            assertTrue(Arrays.equals(payload, response));
+            message.acknowledge();
+         }
+      }
 
-      session1.close();
-      session2.close();
-      sf1.close();
-      locator.close();
-      sf2.close();
       locator2.close();
    }
 

Reply via email to