This is an automated email from the ASF dual-hosted git repository.
clebertsuconic pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/activemq-artemis.git
The following commit(s) were added to refs/heads/main by this push:
new a718bc5932 ARTEMIS-5530 Some handling of compressed messages can throw
NegativeArraySizeException
a718bc5932 is described below
commit a718bc5932fbb578ba48eecbd8d2a7f7a762701b
Author: AntonRoskvist <[email protected]>
AuthorDate: Tue Jun 10 14:55:29 2025 +0200
ARTEMIS-5530 Some handling of compressed messages can throw
NegativeArraySizeException
co-author: In collaboration with Clebert Suconic
---
.../activemq/artemis/api/core/ICoreMessage.java | 2 +
.../core/client/impl/ClientConsumerImpl.java | 4 -
.../core/client/impl/ClientLargeMessageImpl.java | 8 ++
.../core/client/impl/ClientMessageImpl.java | 3 +-
.../core/client/impl/ClientProducerImpl.java | 4 +-
.../artemis/core/message/impl/CoreMessage.java | 6 +-
.../impl/journal/LargeServerMessageImpl.java | 2 +-
.../impl/nullpm/NullStorageLargeServerMessage.java | 3 +-
.../client/LargeMessageCompressTest.java | 104 +++++++++++++++++++--
.../federation/FederatedAddressTest.java | 62 ++++++++++++
.../unit/core/message/impl/MessageImplTest.java | 8 ++
11 files changed, 187 insertions(+), 19 deletions(-)
diff --git
a/artemis-core-client/src/main/java/org/apache/activemq/artemis/api/core/ICoreMessage.java
b/artemis-core-client/src/main/java/org/apache/activemq/artemis/api/core/ICoreMessage.java
index 722f445cde..48827209a1 100644
---
a/artemis-core-client/src/main/java/org/apache/activemq/artemis/api/core/ICoreMessage.java
+++
b/artemis-core-client/src/main/java/org/apache/activemq/artemis/api/core/ICoreMessage.java
@@ -29,6 +29,8 @@ import
org.apache.activemq.artemis.core.message.impl.CoreMessage;
*/
public interface ICoreMessage extends Message {
+ @Override
+ ICoreMessage copy();
/**
* The buffer will belong to this message, until release is called.
*/
diff --git
a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/client/impl/ClientConsumerImpl.java
b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/client/impl/ClientConsumerImpl.java
index 7d6f76794c..d111fd37f1 100644
---
a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/client/impl/ClientConsumerImpl.java
+++
b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/client/impl/ClientConsumerImpl.java
@@ -643,9 +643,6 @@ public final class ClientConsumerImpl implements
ClientConsumerInternal {
largeMessage.setLargeMessageController(new
CompressedLargeMessageControllerImpl(currentLargeMessageController));
// this is refeeding the packet after decompressed, hence the flow
control must be 0
currentLargeMessageController.addPacket(body, 0, false);
- largeMessage.putBooleanProperty(Message.HDR_LARGE_COMPRESSED, false);
- //make sure the message is decompressed before it is handled
- largeMessage.checkCompletion();
currentLargeMessageController = null;
handleRegularMessage(largeMessage);
@@ -685,7 +682,6 @@ public final class ClientConsumerImpl implements
ClientConsumerInternal {
if (clientLargeMessage.isCompressed()) {
clientLargeMessage.setLargeMessageController(new
CompressedLargeMessageControllerImpl(currentLargeMessageController));
- clientLargeMessage.putBooleanProperty(Message.HDR_LARGE_COMPRESSED,
false);
} else {
clientLargeMessage.setLargeMessageController(currentLargeMessageController);
}
diff --git
a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/client/impl/ClientLargeMessageImpl.java
b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/client/impl/ClientLargeMessageImpl.java
index 75db7ece0f..411974f149 100644
---
a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/client/impl/ClientLargeMessageImpl.java
+++
b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/client/impl/ClientLargeMessageImpl.java
@@ -149,6 +149,14 @@ public final class ClientLargeMessageImpl extends
ClientMessageImpl implements C
writableBuffer = new ResetLimitWrappedActiveMQBuffer(BODY_OFFSET,
buffer.duplicate(), this);
largeMessageController.saveBuffer(new
ActiveMQOutputStream(writableBuffer));
+
+ unsetCompressionPropertyIfNeeded();
+ }
+ }
+
+ private void unsetCompressionPropertyIfNeeded() {
+ if (largeMessageController instanceof
CompressedLargeMessageControllerImpl) {
+ putBooleanProperty(Message.HDR_LARGE_COMPRESSED, false);
}
}
diff --git
a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/client/impl/ClientMessageImpl.java
b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/client/impl/ClientMessageImpl.java
index f173d6d8ca..8b110bff43 100644
---
a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/client/impl/ClientMessageImpl.java
+++
b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/client/impl/ClientMessageImpl.java
@@ -24,6 +24,7 @@ import java.util.Objects;
import org.apache.activemq.artemis.api.core.ActiveMQException;
import
org.apache.activemq.artemis.api.core.ActiveMQPropertyConversionException;
+import org.apache.activemq.artemis.api.core.ICoreMessage;
import org.apache.activemq.artemis.api.core.Message;
import org.apache.activemq.artemis.api.core.SimpleString;
import org.apache.activemq.artemis.core.client.ActiveMQClientMessageBundle;
@@ -396,7 +397,7 @@ public class ClientMessageImpl extends CoreMessage
implements ClientMessageInter
}
@Override
- public Message copy() {
+ public ICoreMessage copy() {
return new ClientMessageImpl(this);
}
diff --git
a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/client/impl/ClientProducerImpl.java
b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/client/impl/ClientProducerImpl.java
index 983debcf22..031c58a07d 100644
---
a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/client/impl/ClientProducerImpl.java
+++
b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/client/impl/ClientProducerImpl.java
@@ -396,7 +396,7 @@ public class ClientProducerImpl implements
ClientProducerInternal {
}
private void largeMessageSendStreamed(final boolean sendBlocking,
- final ICoreMessage msgI,
+ ICoreMessage msgI,
final InputStream
inputStreamParameter,
final ClientProducerCredits credits,
SendAcknowledgementHandler handler)
throws ActiveMQException {
@@ -411,6 +411,8 @@ public class ClientProducerImpl implements
ClientProducerInternal {
DeflaterReader deflaterReader = null;
if (session.isCompressLargeMessages()) {
+ // We need to change properties the compressed message as we send it
+ msgI = msgI.copy();
msgI.putBooleanProperty(Message.HDR_LARGE_COMPRESSED, true);
deflaterReader = new DeflaterReader(inputStreamParameter,
messageSize);
deflaterReader.setLevel(session.getCompressionLevel());
diff --git
a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/message/impl/CoreMessage.java
b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/message/impl/CoreMessage.java
index 9801a1e766..40e750216d 100644
---
a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/message/impl/CoreMessage.java
+++
b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/message/impl/CoreMessage.java
@@ -486,6 +486,10 @@ public class CoreMessage extends RefCountMessage
implements ICoreMessage {
}
if (other.buffer != null) {
this.buffer = other.buffer.copy();
+ if (this.buffer.capacity() == 0) {
+ // we are copying an empty buffer probably, we need to set the
proper capacity
+ this.buffer.capacity(other.buffer.capacity());
+ }
}
}
}
@@ -515,7 +519,7 @@ public class CoreMessage extends RefCountMessage implements
ICoreMessage {
}
@Override
- public Message copy() {
+ public ICoreMessage copy() {
getProperties();
checkEncode();
return new CoreMessage(this);
diff --git
a/artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/impl/journal/LargeServerMessageImpl.java
b/artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/impl/journal/LargeServerMessageImpl.java
index da0f05437d..2a5b34f214 100644
---
a/artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/impl/journal/LargeServerMessageImpl.java
+++
b/artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/impl/journal/LargeServerMessageImpl.java
@@ -324,7 +324,7 @@ public final class LargeServerMessageImpl extends
CoreMessage implements CoreLar
}
@Override
- public Message copy() {
+ public ICoreMessage copy() {
SequentialFile newfile =
storageManager.createFileForLargeMessage(messageID, durable);
LargeServerMessageImpl newMessage = new LargeServerMessageImpl(this,
properties, newfile, messageID);
newMessage.setParentRef(this);
diff --git
a/artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/impl/nullpm/NullStorageLargeServerMessage.java
b/artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/impl/nullpm/NullStorageLargeServerMessage.java
index 453779982e..9c87eed3d0 100644
---
a/artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/impl/nullpm/NullStorageLargeServerMessage.java
+++
b/artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/impl/nullpm/NullStorageLargeServerMessage.java
@@ -18,6 +18,7 @@ package
org.apache.activemq.artemis.core.persistence.impl.nullpm;
import io.netty.buffer.Unpooled;
import org.apache.activemq.artemis.api.core.ActiveMQBuffer;
+import org.apache.activemq.artemis.api.core.ICoreMessage;
import org.apache.activemq.artemis.api.core.Message;
import org.apache.activemq.artemis.core.buffers.impl.ChannelBufferWrapper;
import org.apache.activemq.artemis.core.io.SequentialFile;
@@ -135,7 +136,7 @@ class NullStorageLargeServerMessage extends CoreMessage
implements CoreLargeServ
}
@Override
- public Message copy() {
+ public ICoreMessage copy() {
// This is a simple copy, used only to avoid changing original properties
return new NullStorageLargeServerMessage(this);
}
diff --git
a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/client/LargeMessageCompressTest.java
b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/client/LargeMessageCompressTest.java
index 8fece2427e..8f2a57571b 100644
---
a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/client/LargeMessageCompressTest.java
+++
b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/client/LargeMessageCompressTest.java
@@ -569,8 +569,68 @@ public class LargeMessageCompressTest extends
LargeMessageTestBase {
locator2.close();
}
+ @TestTemplate
+ public void testCompressedMessageRouting() throws Exception {
+ SimpleString DATA =
SimpleString.of(RandomUtil.randomAlphaNumericString(110 * 1024));
+
+ ActiveMQServer server = createServer(true, isNetty());
+ server.start();
+
+
server.createQueue(QueueConfiguration.of(ADDRESS).setRoutingType(RoutingType.ANYCAST));
+
+ locator.setAckBatchSize(0);
+
+ try (ServerLocator locator2 = createFactory(isNetty()); ServerLocator
locator3 = createFactory(isNetty())) {
+ locator2.setMinLargeMessageSize(10240);
+ //Any sufficiently large value here causes a
"java.lang.NegativeArraySizeException"
+ locator3.setMinLargeMessageSize(1024000);
+
+ ClientSessionFactory sf = locator.createSessionFactory();
+ ClientSession session = sf.createSession(true, true);
+ ClientProducer producer = session.createProducer(ADDRESS);
+ ClientConsumer consumer = session.createConsumer(ADDRESS);
+
+ ClientMessage message = session.createMessage(true);
+ message.getBodyBuffer().writeNullableSimpleString(DATA);
+ producer.send(message);
+
+ session.start();
+ message = consumer.receive(2000);
+ assertNotNull(message);
+ assertTrue(message.getBooleanProperty(Message.HDR_LARGE_COMPRESSED));
+ message.checkCompletion();
+ message.acknowledge();
+
+ ClientSessionFactory sf2 = locator2.createSessionFactory();
+ ClientSessionFactory sf3 = locator3.createSessionFactory();
+ ClientSession session2 = sf2.createSession(true, true);
+ ClientSession session3 = sf3.createSession(true, true);
+ ClientProducer producer2 = session2.createProducer(ADDRESS);
+ ClientProducer producer3 = session3.createProducer(ADDRESS);
+ ClientMessage receivedMessage;
+
+ //Notice the _AMQ_LARGE_SIZE value changing part way through
+ for (int i = 0; i < 3; i++) {
+ producer.send(message);
+ producer2.send(message);
+ producer3.send(message);
+ }
+
+ for (int i = 0; i < 9; i++) {
+ receivedMessage = consumer.receive(2000);
+ assertNotNull(receivedMessage);
+ assertEquals(DATA,
receivedMessage.getBodyBuffer().readNullableSimpleString());
+ receivedMessage.acknowledge();
+ }
+
+ consumer.close();
+ }
+
+ }
+
@TestTemplate
public void testLargeMessageCompressionLevel() throws Exception {
+ SimpleString DATA =
SimpleString.of(RandomUtil.randomAlphaNumericString(1024 * 1024));
SimpleString address1 = SimpleString.of("address1");
SimpleString address2 = SimpleString.of("address2");
@@ -602,16 +662,16 @@ public class LargeMessageCompressTest extends
LargeMessageTestBase {
session2.createQueue(QueueConfiguration.of(address2));
session3.createQueue(QueueConfiguration.of(address3));
- String inputString =
"blahblahblah??blahblahblahblahblah??blablahblah??blablahblah??bla";
- for (int i = 0; i < 20; i++) {
- inputString = inputString + inputString;
- }
+ ClientMessage message1 = session1.createMessage(true);
+ ClientMessage message2 = session2.createMessage(true);
+ ClientMessage message3 = session3.createMessage(true);
+ message1.getBodyBuffer().writeNullableSimpleString(DATA);
+ message2.getBodyBuffer().writeNullableSimpleString(DATA);
+ message3.getBodyBuffer().writeNullableSimpleString(DATA);
- ClientMessage message = session1.createMessage(true);
- message.getBodyBuffer().writeString(inputString);
- producer1.send(message);
- producer2.send(message);
- producer3.send(message);
+ producer1.send(message1);
+ producer2.send(message2);
+ producer3.send(message3);
QueueControl queueControl1 = (QueueControl)server.getManagementService().
getResource(ResourceNames.QUEUE + address1);
@@ -623,10 +683,34 @@ public class LargeMessageCompressTest extends
LargeMessageTestBase {
assertEquals(1, queueControl1.getMessageCount());
assertEquals(1, queueControl2.getMessageCount());
assertEquals(1, queueControl3.getMessageCount());
- assertTrue(message.getPersistentSize() >
queueControl1.getPersistentSize());
+
+ assertTrue(message1.getPersistentSize() >
queueControl1.getPersistentSize());
assertTrue(queueControl1.getPersistentSize() >
queueControl2.getPersistentSize());
assertTrue(queueControl2.getPersistentSize() >
queueControl3.getPersistentSize());
+ ClientConsumer consumer1 = session1.createConsumer(address1);
+ ClientConsumer consumer2 = session2.createConsumer(address2);
+ ClientConsumer consumer3 = session3.createConsumer(address3);
+ session1.start();
+ session2.start();
+ session3.start();
+
+ ClientMessage message;
+ message = consumer1.receive(2000);
+ assertNotNull(message);
+ assertEquals(DATA, message.getBodyBuffer().readNullableSimpleString());
+ message.acknowledge();
+
+ message = consumer2.receive(2000);
+ assertNotNull(message);
+ assertEquals(DATA, message.getBodyBuffer().readNullableSimpleString());
+ message.acknowledge();
+
+ message = consumer3.receive(2000);
+ assertNotNull(message);
+ assertEquals(DATA, message.getBodyBuffer().readNullableSimpleString());
+ message.acknowledge();
+
sf1.close();
sf2.close();
sf3.close();
diff --git
a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/federation/FederatedAddressTest.java
b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/federation/FederatedAddressTest.java
index c5e68a13a2..6318f7bb0b 100644
---
a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/federation/FederatedAddressTest.java
+++
b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/federation/FederatedAddressTest.java
@@ -24,6 +24,7 @@ import javax.jms.MessageConsumer;
import javax.jms.MessageProducer;
import javax.jms.Queue;
import javax.jms.Session;
+import javax.jms.TextMessage;
import javax.jms.Topic;
import org.apache.activemq.artemis.api.core.RoutingType;
@@ -37,9 +38,11 @@ import
org.apache.activemq.artemis.core.server.impl.AddressInfo;
import org.apache.activemq.artemis.core.server.transformer.Transformer;
import org.apache.activemq.artemis.jms.client.ActiveMQConnectionFactory;
import org.apache.activemq.artemis.tests.util.Wait;
+import org.apache.activemq.artemis.utils.RandomUtil;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
+import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertNotNull;
import static org.junit.jupiter.api.Assertions.assertNull;
import static org.junit.jupiter.api.Assertions.assertTrue;
@@ -647,6 +650,65 @@ public class FederatedAddressTest extends
FederatedTestBase {
}
}
+ @Test
+ public void testUpstreamFederatedAddressWithCompressedMessage() throws
Exception {
+ final String DATA_REGULAR = RandomUtil.randomAlphaNumericString(10);
+ final String DATA_COMPRESSED_REGULAR = "Compresses easily".repeat(500 *
1024);
+ final String DATA_COMPRESSED_LARGE =
RandomUtil.randomAlphaNumericString(1024 * 1024);
+ final String address = getName();
+
+ FederationConfiguration federationConfiguration =
FederatedTestUtil.createAddressUpstreamFederationConfiguration("server1",
address);
+
getServer(0).getConfiguration().getFederationConfigurations().add(federationConfiguration);
+ getServer(0).getFederationManager().deploy();
+
+ FederationConfiguration federationConfiguration2 =
FederatedTestUtil.createAddressUpstreamFederationConfiguration("server0",
address);
+
getServer(1).getConfiguration().getFederationConfigurations().add(federationConfiguration2);
+ getServer(1).getFederationManager().deploy();
+
+ ActiveMQConnectionFactory cf0 = new ActiveMQConnectionFactory("vm://" +
0);
+ ActiveMQConnectionFactory cf1 = new ActiveMQConnectionFactory("vm://" +
1);
+ cf0.setCompressLargeMessage(true);
+ cf1.setCompressLargeMessage(true);
+
+ try (Connection connection1 = cf1.createConnection();
+ Connection connection0 = cf0.createConnection()) {
+ connection1.start();
+ connection0.start();
+
+ Session session0 = connection0.createSession();
+ Session session1 = connection1.createSession();
+ Topic topic0 = session0.createTopic(address);
+ Topic topic1 = session1.createTopic(address);
+ MessageConsumer consumer0 = session0.createConsumer(topic0);
+ MessageConsumer consumer1 = session1.createConsumer(topic1);
+
+ MessageProducer producer = session0.createProducer(topic0);
+
+ sendAndConsume(producer, session0, DATA_REGULAR, "regular",
consumer0, consumer1);
+ sendAndConsume(producer, session0, DATA_COMPRESSED_LARGE,
"compressedLarge", consumer0, consumer1);
+ sendAndConsume(producer, session0, DATA_COMPRESSED_REGULAR,
"compressedRegular", consumer0, consumer1);
+ }
+ }
+
+ private void sendAndConsume(MessageProducer producer,
+ Session session0,
+ String data,
+ String identification,
+ MessageConsumer consumer0,
+ MessageConsumer consumer1) throws Exception {
+ {
+ TextMessage message = session0.createTextMessage(data);
+ message.setStringProperty("identification", identification);
+ producer.send(message);
+ }
+ TextMessage message0 = (TextMessage) consumer0.receive(1000);
+ TextMessage message1 = (TextMessage) consumer1.receive(1000);
+ assertNotNull(message0);
+ assertNotNull(message1);
+ assertEquals(data, message0.getText());
+ assertEquals(data, message1.getText());
+ }
+
private Message createTextMessage(Session session1, String group) throws
JMSException {
Message message = session1.createTextMessage("hello");
message.setStringProperty("JMSXGroupID", group);
diff --git
a/tests/unit-tests/src/test/java/org/apache/activemq/artemis/tests/unit/core/message/impl/MessageImplTest.java
b/tests/unit-tests/src/test/java/org/apache/activemq/artemis/tests/unit/core/message/impl/MessageImplTest.java
index c014867312..50e81f6d7a 100644
---
a/tests/unit-tests/src/test/java/org/apache/activemq/artemis/tests/unit/core/message/impl/MessageImplTest.java
+++
b/tests/unit-tests/src/test/java/org/apache/activemq/artemis/tests/unit/core/message/impl/MessageImplTest.java
@@ -41,6 +41,7 @@ import
org.apache.activemq.artemis.api.core.client.ServerLocator;
import org.apache.activemq.artemis.core.client.impl.ClientMessageImpl;
import org.apache.activemq.artemis.core.config.impl.ConfigurationImpl;
import org.apache.activemq.artemis.core.message.impl.CoreMessage;
+import org.apache.activemq.artemis.core.persistence.CoreMessageObjectPools;
import
org.apache.activemq.artemis.core.protocol.core.impl.wireformat.SessionSendMessage;
import org.apache.activemq.artemis.core.server.ActiveMQServer;
import org.apache.activemq.artemis.tests.util.ActiveMQTestBase;
@@ -253,6 +254,13 @@ public class MessageImplTest extends ActiveMQTestBase {
}
}
+ @Test
+ public void testCopyAndGetBuffer() throws Exception {
+ ClientMessageImpl messageImpl = new ClientMessageImpl((byte)0, true, 0L,
0L, (byte)0, 100, new CoreMessageObjectPools());
+ ICoreMessage copy = messageImpl.copy();
+ copy.getBodyBuffer();
+ }
+
@Test
public void testMessageCopyHeadersAndProperties() {
CoreMessage msg1 = new CoreMessage(123, 18);
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]
For further information, visit: https://activemq.apache.org/contact