This is an automated email from the ASF dual-hosted git repository.

clebertsuconic pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/activemq-artemis.git


The following commit(s) were added to refs/heads/main by this push:
     new 7e350d29c8 ARTEMIS-5616 Rollback of compressed messages return too 
many credits
7e350d29c8 is described below

commit 7e350d29c815810c1ba5fbbcb1c2141629f83772
Author: AntonRoskvist <[email protected]>
AuthorDate: Thu Mar 20 15:32:28 2025 +0100

    ARTEMIS-5616 Rollback of compressed messages return too many credits
    
    co-author: In collaboration with Clebert Suconic
---
 .../core/client/impl/ClientConsumerImpl.java       |  14 ++-
 .../core/client/impl/ClientLargeMessageImpl.java   |   5 +
 .../core/client/impl/ClientMessageInternal.java    |   4 +
 .../impl/CompressedLargeMessageControllerImpl.java |  10 ++
 .../core/client/impl/LargeMessageController.java   |   7 ++
 .../client/impl/LargeMessageControllerImpl.java    |  15 ++-
 .../CompressedMessagesClientCreditsTest.java       | 124 +++++++++++++++++++++
 7 files changed, 170 insertions(+), 9 deletions(-)

diff --git 
a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/client/impl/ClientConsumerImpl.java
 
b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/client/impl/ClientConsumerImpl.java
index e829277144..7d6f76794c 100644
--- 
a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/client/impl/ClientConsumerImpl.java
+++ 
b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/client/impl/ClientConsumerImpl.java
@@ -574,7 +574,7 @@ public final class ClientConsumerImpl implements 
ClientConsumerInternal {
       }
 
       if (message.getBooleanProperty(Message.HDR_LARGE_COMPRESSED)) {
-         handleCompressedMessage(message);
+         handleCompressedMessageSentAsRegular(message);
       } else {
          handleRegularMessage(message);
       }
@@ -618,7 +618,7 @@ public final class ClientConsumerImpl implements 
ClientConsumerInternal {
     * Say that you sent a 1G message full of spaces. That could be just bellow 
100K compressed but you wouldn't have
     * enough memory to decompress it
     */
-   private void handleCompressedMessage(final ClientMessageInternal clMessage) 
throws Exception {
+   private void handleCompressedMessageSentAsRegular(final 
ClientMessageInternal clMessage) throws Exception {
       ClientLargeMessageImpl largeMessage = new ClientLargeMessageImpl();
       largeMessage.retrieveExistingData(clMessage);
 
@@ -633,7 +633,7 @@ public final class ClientConsumerImpl implements 
ClientConsumerInternal {
       long callTimeout = locator.getCallTimeout();
 
       currentLargeMessageController = new LargeMessageControllerImpl(this, 
largeMessage.getLargeMessageSize(), callTimeout, largeMessageCache);
-      currentLargeMessageController.setLocal(true);
+      currentLargeMessageController.setOriginallyRegular(true);
 
       //sets the packet
       ActiveMQBuffer qbuff = clMessage.toCore().getBodyBuffer();
@@ -641,8 +641,12 @@ public final class ClientConsumerImpl implements 
ClientConsumerInternal {
       final byte[] body = new byte[bytesToRead];
       qbuff.readBytes(body);
       largeMessage.setLargeMessageController(new 
CompressedLargeMessageControllerImpl(currentLargeMessageController));
-      currentLargeMessageController.addPacket(body, body.length, false);
+      // this is refeeding the packet after decompressed, hence the flow 
control must be 0
+      currentLargeMessageController.addPacket(body, 0, false);
       largeMessage.putBooleanProperty(Message.HDR_LARGE_COMPRESSED, false);
+      //make sure the message is decompressed before it is handled
+      largeMessage.checkCompletion();
+      currentLargeMessageController = null;
 
       handleRegularMessage(largeMessage);
    }
@@ -1058,7 +1062,7 @@ public final class ClientConsumerImpl implements 
ClientConsumerInternal {
       // Chunk messages will execute the flow control while receiving the 
chunks
       if (message.getFlowControlSize() != 0) {
          // on large messages we should discount 1 on the first packets as we 
need continuity until the last packet
-         flowControl(message.getFlowControlSize(), !message.isLargeMessage());
+         flowControl(message.getFlowControlSize(), !message.isLargeMessage() 
|| message.isOriginallyRegular());
       }
    }
 
diff --git 
a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/client/impl/ClientLargeMessageImpl.java
 
b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/client/impl/ClientLargeMessageImpl.java
index f024f5ccd3..75db7ece0f 100644
--- 
a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/client/impl/ClientLargeMessageImpl.java
+++ 
b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/client/impl/ClientLargeMessageImpl.java
@@ -166,6 +166,11 @@ public final class ClientLargeMessageImpl extends 
ClientMessageImpl implements C
       }
    }
 
+   @Override
+   public boolean isOriginallyRegular() {
+      return largeMessageController.isOriginallyRegular();
+   }
+
    public void retrieveExistingData(ClientMessageInternal clMessage) {
       this.internalSetMessageID(clMessage.getMessageID());
       this.address = clMessage.getAddressSimpleString();
diff --git 
a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/client/impl/ClientMessageInternal.java
 
b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/client/impl/ClientMessageInternal.java
index 3edeae5b13..b72147a0d5 100644
--- 
a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/client/impl/ClientMessageInternal.java
+++ 
b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/client/impl/ClientMessageInternal.java
@@ -33,4 +33,8 @@ public interface ClientMessageInternal extends ClientMessage {
 
    boolean isCompressed();
 
+   default boolean isOriginallyRegular() {
+      return false;
+   }
+
 }
diff --git 
a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/client/impl/CompressedLargeMessageControllerImpl.java
 
b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/client/impl/CompressedLargeMessageControllerImpl.java
index 92be797a1e..18389bbeb1 100644
--- 
a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/client/impl/CompressedLargeMessageControllerImpl.java
+++ 
b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/client/impl/CompressedLargeMessageControllerImpl.java
@@ -48,6 +48,16 @@ final class CompressedLargeMessageControllerImpl implements 
LargeMessageControll
       bufferDelegate.discardUnusedPackets();
    }
 
+   @Override
+   public boolean isOriginallyRegular() {
+      return bufferDelegate.isOriginallyRegular();
+   }
+
+   @Override
+   public void setOriginallyRegular(boolean regular) {
+      bufferDelegate.setOriginallyRegular(regular);
+   }
+
    /**
     * Add a buff to the List, or save it to the OutputStream if set
     */
diff --git 
a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/client/impl/LargeMessageController.java
 
b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/client/impl/LargeMessageController.java
index f943ea40bc..3b6766502e 100644
--- 
a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/client/impl/LargeMessageController.java
+++ 
b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/client/impl/LargeMessageController.java
@@ -23,6 +23,13 @@ import 
org.apache.activemq.artemis.api.core.ActiveMQException;
 
 public interface LargeMessageController extends ActiveMQBuffer {
 
+   /**
+    * Message was originally a regular message.
+    * */
+   boolean isOriginallyRegular();
+
+   void setOriginallyRegular(boolean regular);
+
    /**
     * {@return the size of this buffer.}
     */
diff --git 
a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/client/impl/LargeMessageControllerImpl.java
 
b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/client/impl/LargeMessageControllerImpl.java
index 28b679cb05..5c3caf54b4 100644
--- 
a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/client/impl/LargeMessageControllerImpl.java
+++ 
b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/client/impl/LargeMessageControllerImpl.java
@@ -91,7 +91,8 @@ public class LargeMessageControllerImpl implements 
LargeMessageController {
 
    private final FileCache fileCache;
 
-   private boolean local = false;
+   // converted from a regular compressed message
+   private boolean originallyRegular = false;
 
    public LargeMessageControllerImpl(final ClientConsumerInternal 
consumerInternal,
                                      final long totalSize,
@@ -122,14 +123,20 @@ public class LargeMessageControllerImpl implements 
LargeMessageController {
       this.bufferSize = bufferSize;
    }
 
-   public void setLocal(boolean local) {
-      this.local = local;
+   @Override
+   public boolean isOriginallyRegular() {
+      return originallyRegular;
+   }
+
+   @Override
+   public void setOriginallyRegular(boolean originallyRegular) {
+      this.originallyRegular = originallyRegular;
    }
 
    @Override
    public void discardUnusedPackets() {
       if (outStream == null) {
-         if (local)
+         if (originallyRegular)
             return;
          try {
             checkForPacket(totalSize - 1);
diff --git 
a/tests/soak-tests/src/test/java/org/apache/activemq/artemis/tests/soak/client/CompressedMessagesClientCreditsTest.java
 
b/tests/soak-tests/src/test/java/org/apache/activemq/artemis/tests/soak/client/CompressedMessagesClientCreditsTest.java
new file mode 100644
index 0000000000..ee56262e29
--- /dev/null
+++ 
b/tests/soak-tests/src/test/java/org/apache/activemq/artemis/tests/soak/client/CompressedMessagesClientCreditsTest.java
@@ -0,0 +1,124 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.activemq.artemis.tests.soak.client;
+
+import javax.jms.Connection;
+import javax.jms.MessageConsumer;
+import javax.jms.MessageProducer;
+import javax.jms.Session;
+import javax.jms.TextMessage;
+import org.apache.activemq.artemis.core.server.ActiveMQServer;
+import org.apache.activemq.artemis.core.server.Queue;
+import org.apache.activemq.artemis.core.server.ServerSession;
+import org.apache.activemq.artemis.core.server.impl.ServerConsumerImpl;
+import org.apache.activemq.artemis.core.settings.impl.AddressSettings;
+import org.apache.activemq.artemis.jms.client.ActiveMQConnectionFactory;
+import org.apache.activemq.artemis.tests.util.ActiveMQTestBase;
+import org.apache.activemq.artemis.utils.RandomUtil;
+import org.apache.activemq.artemis.utils.Wait;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertNotNull;
+import static org.junit.jupiter.api.Assertions.assertNull;
+
+public class CompressedMessagesClientCreditsTest extends ActiveMQTestBase {
+
+   ActiveMQServer server;
+
+   @BeforeEach
+   @Override
+   public void setUp() throws Exception {
+      super.setUp();
+
+      this.server = this.createServer(true, true);
+      server.getAddressSettingsRepository().clear();
+      server.getAddressSettingsRepository().addMatch("#", new 
AddressSettings().setMaxDeliveryAttempts(-1));
+      server.start();
+   }
+
+   @Test
+   public void testRollbackOnCompressedButRegularMessages() throws Exception {
+      // this will make it compress really well, up to the point the message 
will be regular
+      internalTest(" ".repeat(500 * 1024));
+   }
+
+   @Test
+   public void testRollbackOnCompressedStillLarge() throws Exception {
+      // the string will still be large even after compression
+      internalTest(RandomUtil.randomAlphaNumericString(500 * 1024));
+   }
+
+   private void internalTest(String originalString) throws Exception {
+      final String queueName = "queue";
+
+      ActiveMQConnectionFactory connectionFactory = new 
ActiveMQConnectionFactory("tcp://localhost:61616");
+      connectionFactory.setCompressLargeMessage(true);
+      connectionFactory.setCompressionLevel(6);
+      connectionFactory.setConsumerWindowSize(0);
+      connectionFactory.setMinLargeMessageSize(1024 * 10);
+
+      try (Connection connection = connectionFactory.createConnection()) {
+         connection.start();
+
+         Session producerSession = 
connection.createSession(Session.AUTO_ACKNOWLEDGE);
+         MessageProducer producer = 
producerSession.createProducer(producerSession.createQueue(queueName));
+
+         TextMessage sendMessage = producerSession.createTextMessage();
+         sendMessage.setText(originalString);
+
+         Session consumerSession = 
connection.createSession(Session.SESSION_TRANSACTED);
+         MessageConsumer consumer = 
consumerSession.createConsumer(consumerSession.createQueue(queueName));
+
+         Queue queue = server.locateQueue(queueName);
+
+         ServerConsumerImpl serverConsumer = null;
+         for (ServerSession s : server.getSessions()) {
+            if (s.getConsumerCount() > 0) {
+               serverConsumer = (ServerConsumerImpl) 
s.getServerConsumers().iterator().next();
+            }
+         }
+
+         int numberOfMessages = 10;
+
+         for (int i = 0; i < numberOfMessages; i++) {
+            producer.send(sendMessage);
+         }
+         Wait.assertEquals((long) numberOfMessages, queue::getMessageCount, 
5000, 100);
+
+         int rollbackLoop = 10;
+
+         assertEquals(0, serverConsumer.getAvailableCredits().get());
+         for (int i = 0; i < rollbackLoop; i++) {
+            assertNotNull(consumer.receive(1000));
+            queue.flushExecutor();
+            Wait.assertEquals(0, serverConsumer.getAvailableCredits()::get, 
5000, 100);
+            Wait.assertEquals(1, queue::getDeliveringCount, 5000, 100);
+            consumerSession.rollback();
+         }
+
+         for (int i = 0; i < 10; i++) {
+            assertNotNull(consumer.receive(5000));
+         }
+         assertNull(consumer.receiveNoWait());
+         consumerSession.commit();
+
+      }
+   }
+}


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]
For further information, visit: https://activemq.apache.org/contact


Reply via email to