This is an automated email from the ASF dual-hosted git repository.
clebertsuconic pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/activemq-artemis.git
The following commit(s) were added to refs/heads/main by this push:
new 7e350d29c8 ARTEMIS-5616 Rollback of compressed messages return too
many credits
7e350d29c8 is described below
commit 7e350d29c815810c1ba5fbbcb1c2141629f83772
Author: AntonRoskvist <[email protected]>
AuthorDate: Thu Mar 20 15:32:28 2025 +0100
ARTEMIS-5616 Rollback of compressed messages return too many credits
co-author: In collaboration with Clebert Suconic
---
.../core/client/impl/ClientConsumerImpl.java | 14 ++-
.../core/client/impl/ClientLargeMessageImpl.java | 5 +
.../core/client/impl/ClientMessageInternal.java | 4 +
.../impl/CompressedLargeMessageControllerImpl.java | 10 ++
.../core/client/impl/LargeMessageController.java | 7 ++
.../client/impl/LargeMessageControllerImpl.java | 15 ++-
.../CompressedMessagesClientCreditsTest.java | 124 +++++++++++++++++++++
7 files changed, 170 insertions(+), 9 deletions(-)
diff --git
a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/client/impl/ClientConsumerImpl.java
b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/client/impl/ClientConsumerImpl.java
index e829277144..7d6f76794c 100644
---
a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/client/impl/ClientConsumerImpl.java
+++
b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/client/impl/ClientConsumerImpl.java
@@ -574,7 +574,7 @@ public final class ClientConsumerImpl implements
ClientConsumerInternal {
}
if (message.getBooleanProperty(Message.HDR_LARGE_COMPRESSED)) {
- handleCompressedMessage(message);
+ handleCompressedMessageSentAsRegular(message);
} else {
handleRegularMessage(message);
}
@@ -618,7 +618,7 @@ public final class ClientConsumerImpl implements
ClientConsumerInternal {
* Say that you sent a 1G message full of spaces. That could be just bellow
100K compressed but you wouldn't have
* enough memory to decompress it
*/
- private void handleCompressedMessage(final ClientMessageInternal clMessage)
throws Exception {
+ private void handleCompressedMessageSentAsRegular(final
ClientMessageInternal clMessage) throws Exception {
ClientLargeMessageImpl largeMessage = new ClientLargeMessageImpl();
largeMessage.retrieveExistingData(clMessage);
@@ -633,7 +633,7 @@ public final class ClientConsumerImpl implements
ClientConsumerInternal {
long callTimeout = locator.getCallTimeout();
currentLargeMessageController = new LargeMessageControllerImpl(this,
largeMessage.getLargeMessageSize(), callTimeout, largeMessageCache);
- currentLargeMessageController.setLocal(true);
+ currentLargeMessageController.setOriginallyRegular(true);
//sets the packet
ActiveMQBuffer qbuff = clMessage.toCore().getBodyBuffer();
@@ -641,8 +641,12 @@ public final class ClientConsumerImpl implements
ClientConsumerInternal {
final byte[] body = new byte[bytesToRead];
qbuff.readBytes(body);
largeMessage.setLargeMessageController(new
CompressedLargeMessageControllerImpl(currentLargeMessageController));
- currentLargeMessageController.addPacket(body, body.length, false);
+ // this is refeeding the packet after decompressed, hence the flow
control must be 0
+ currentLargeMessageController.addPacket(body, 0, false);
largeMessage.putBooleanProperty(Message.HDR_LARGE_COMPRESSED, false);
+ //make sure the message is decompressed before it is handled
+ largeMessage.checkCompletion();
+ currentLargeMessageController = null;
handleRegularMessage(largeMessage);
}
@@ -1058,7 +1062,7 @@ public final class ClientConsumerImpl implements
ClientConsumerInternal {
// Chunk messages will execute the flow control while receiving the
chunks
if (message.getFlowControlSize() != 0) {
// on large messages we should discount 1 on the first packets as we
need continuity until the last packet
- flowControl(message.getFlowControlSize(), !message.isLargeMessage());
+ flowControl(message.getFlowControlSize(), !message.isLargeMessage()
|| message.isOriginallyRegular());
}
}
diff --git
a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/client/impl/ClientLargeMessageImpl.java
b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/client/impl/ClientLargeMessageImpl.java
index f024f5ccd3..75db7ece0f 100644
---
a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/client/impl/ClientLargeMessageImpl.java
+++
b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/client/impl/ClientLargeMessageImpl.java
@@ -166,6 +166,11 @@ public final class ClientLargeMessageImpl extends
ClientMessageImpl implements C
}
}
+ @Override
+ public boolean isOriginallyRegular() {
+ return largeMessageController.isOriginallyRegular();
+ }
+
public void retrieveExistingData(ClientMessageInternal clMessage) {
this.internalSetMessageID(clMessage.getMessageID());
this.address = clMessage.getAddressSimpleString();
diff --git
a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/client/impl/ClientMessageInternal.java
b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/client/impl/ClientMessageInternal.java
index 3edeae5b13..b72147a0d5 100644
---
a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/client/impl/ClientMessageInternal.java
+++
b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/client/impl/ClientMessageInternal.java
@@ -33,4 +33,8 @@ public interface ClientMessageInternal extends ClientMessage {
boolean isCompressed();
+ default boolean isOriginallyRegular() {
+ return false;
+ }
+
}
diff --git
a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/client/impl/CompressedLargeMessageControllerImpl.java
b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/client/impl/CompressedLargeMessageControllerImpl.java
index 92be797a1e..18389bbeb1 100644
---
a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/client/impl/CompressedLargeMessageControllerImpl.java
+++
b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/client/impl/CompressedLargeMessageControllerImpl.java
@@ -48,6 +48,16 @@ final class CompressedLargeMessageControllerImpl implements
LargeMessageControll
bufferDelegate.discardUnusedPackets();
}
+ @Override
+ public boolean isOriginallyRegular() {
+ return bufferDelegate.isOriginallyRegular();
+ }
+
+ @Override
+ public void setOriginallyRegular(boolean regular) {
+ bufferDelegate.setOriginallyRegular(regular);
+ }
+
/**
* Add a buff to the List, or save it to the OutputStream if set
*/
diff --git
a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/client/impl/LargeMessageController.java
b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/client/impl/LargeMessageController.java
index f943ea40bc..3b6766502e 100644
---
a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/client/impl/LargeMessageController.java
+++
b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/client/impl/LargeMessageController.java
@@ -23,6 +23,13 @@ import
org.apache.activemq.artemis.api.core.ActiveMQException;
public interface LargeMessageController extends ActiveMQBuffer {
+ /**
+ * Message was originally a regular message.
+ * */
+ boolean isOriginallyRegular();
+
+ void setOriginallyRegular(boolean regular);
+
/**
* {@return the size of this buffer.}
*/
diff --git
a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/client/impl/LargeMessageControllerImpl.java
b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/client/impl/LargeMessageControllerImpl.java
index 28b679cb05..5c3caf54b4 100644
---
a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/client/impl/LargeMessageControllerImpl.java
+++
b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/client/impl/LargeMessageControllerImpl.java
@@ -91,7 +91,8 @@ public class LargeMessageControllerImpl implements
LargeMessageController {
private final FileCache fileCache;
- private boolean local = false;
+ // converted from a regular compressed message
+ private boolean originallyRegular = false;
public LargeMessageControllerImpl(final ClientConsumerInternal
consumerInternal,
final long totalSize,
@@ -122,14 +123,20 @@ public class LargeMessageControllerImpl implements
LargeMessageController {
this.bufferSize = bufferSize;
}
- public void setLocal(boolean local) {
- this.local = local;
+ @Override
+ public boolean isOriginallyRegular() {
+ return originallyRegular;
+ }
+
+ @Override
+ public void setOriginallyRegular(boolean originallyRegular) {
+ this.originallyRegular = originallyRegular;
}
@Override
public void discardUnusedPackets() {
if (outStream == null) {
- if (local)
+ if (originallyRegular)
return;
try {
checkForPacket(totalSize - 1);
diff --git
a/tests/soak-tests/src/test/java/org/apache/activemq/artemis/tests/soak/client/CompressedMessagesClientCreditsTest.java
b/tests/soak-tests/src/test/java/org/apache/activemq/artemis/tests/soak/client/CompressedMessagesClientCreditsTest.java
new file mode 100644
index 0000000000..ee56262e29
--- /dev/null
+++
b/tests/soak-tests/src/test/java/org/apache/activemq/artemis/tests/soak/client/CompressedMessagesClientCreditsTest.java
@@ -0,0 +1,124 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.activemq.artemis.tests.soak.client;
+
+import javax.jms.Connection;
+import javax.jms.MessageConsumer;
+import javax.jms.MessageProducer;
+import javax.jms.Session;
+import javax.jms.TextMessage;
+import org.apache.activemq.artemis.core.server.ActiveMQServer;
+import org.apache.activemq.artemis.core.server.Queue;
+import org.apache.activemq.artemis.core.server.ServerSession;
+import org.apache.activemq.artemis.core.server.impl.ServerConsumerImpl;
+import org.apache.activemq.artemis.core.settings.impl.AddressSettings;
+import org.apache.activemq.artemis.jms.client.ActiveMQConnectionFactory;
+import org.apache.activemq.artemis.tests.util.ActiveMQTestBase;
+import org.apache.activemq.artemis.utils.RandomUtil;
+import org.apache.activemq.artemis.utils.Wait;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertNotNull;
+import static org.junit.jupiter.api.Assertions.assertNull;
+
+public class CompressedMessagesClientCreditsTest extends ActiveMQTestBase {
+
+ ActiveMQServer server;
+
+ @BeforeEach
+ @Override
+ public void setUp() throws Exception {
+ super.setUp();
+
+ this.server = this.createServer(true, true);
+ server.getAddressSettingsRepository().clear();
+ server.getAddressSettingsRepository().addMatch("#", new
AddressSettings().setMaxDeliveryAttempts(-1));
+ server.start();
+ }
+
+ @Test
+ public void testRollbackOnCompressedButRegularMessages() throws Exception {
+ // this will make it compress really well, up to the point the message
will be regular
+ internalTest(" ".repeat(500 * 1024));
+ }
+
+ @Test
+ public void testRollbackOnCompressedStillLarge() throws Exception {
+ // the string will still be large even after compression
+ internalTest(RandomUtil.randomAlphaNumericString(500 * 1024));
+ }
+
+ private void internalTest(String originalString) throws Exception {
+ final String queueName = "queue";
+
+ ActiveMQConnectionFactory connectionFactory = new
ActiveMQConnectionFactory("tcp://localhost:61616");
+ connectionFactory.setCompressLargeMessage(true);
+ connectionFactory.setCompressionLevel(6);
+ connectionFactory.setConsumerWindowSize(0);
+ connectionFactory.setMinLargeMessageSize(1024 * 10);
+
+ try (Connection connection = connectionFactory.createConnection()) {
+ connection.start();
+
+ Session producerSession =
connection.createSession(Session.AUTO_ACKNOWLEDGE);
+ MessageProducer producer =
producerSession.createProducer(producerSession.createQueue(queueName));
+
+ TextMessage sendMessage = producerSession.createTextMessage();
+ sendMessage.setText(originalString);
+
+ Session consumerSession =
connection.createSession(Session.SESSION_TRANSACTED);
+ MessageConsumer consumer =
consumerSession.createConsumer(consumerSession.createQueue(queueName));
+
+ Queue queue = server.locateQueue(queueName);
+
+ ServerConsumerImpl serverConsumer = null;
+ for (ServerSession s : server.getSessions()) {
+ if (s.getConsumerCount() > 0) {
+ serverConsumer = (ServerConsumerImpl)
s.getServerConsumers().iterator().next();
+ }
+ }
+
+ int numberOfMessages = 10;
+
+ for (int i = 0; i < numberOfMessages; i++) {
+ producer.send(sendMessage);
+ }
+ Wait.assertEquals((long) numberOfMessages, queue::getMessageCount,
5000, 100);
+
+ int rollbackLoop = 10;
+
+ assertEquals(0, serverConsumer.getAvailableCredits().get());
+ for (int i = 0; i < rollbackLoop; i++) {
+ assertNotNull(consumer.receive(1000));
+ queue.flushExecutor();
+ Wait.assertEquals(0, serverConsumer.getAvailableCredits()::get,
5000, 100);
+ Wait.assertEquals(1, queue::getDeliveringCount, 5000, 100);
+ consumerSession.rollback();
+ }
+
+ for (int i = 0; i < 10; i++) {
+ assertNotNull(consumer.receive(5000));
+ }
+ assertNull(consumer.receiveNoWait());
+ consumerSession.commit();
+
+ }
+ }
+}
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]
For further information, visit: https://activemq.apache.org/contact