This is an automated email from the ASF dual-hosted git repository.
clebertsuconic pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/activemq-artemis.git
The following commit(s) were added to refs/heads/main by this push:
new 2583ac6d79 ARTEMIS-5523 Fixing race between on estimating AMQP Message
Memory Size
2583ac6d79 is described below
commit 2583ac6d79b325e8fb2ebc21a9358c054206ae11
Author: Clebert Suconic <[email protected]>
AuthorDate: Thu Jun 5 14:40:18 2025 -0400
ARTEMIS-5523 Fixing race between on estimating AMQP Message Memory Size
---
.../apache/activemq/artemis/api/core/Message.java | 3 +
.../protocol/amqp/broker/AMQPLargeMessage.java | 2 +-
.../artemis/protocol/amqp/broker/AMQPMessage.java | 13 +-
.../protocol/amqp/broker/AMQPStandardMessage.java | 2 +-
.../artemis/core/server/impl/QueueImpl.java | 1 +
.../openwire/amq/ValidateAddressSizeTest.java | 212 +++++++++++++++++++++
6 files changed, 229 insertions(+), 4 deletions(-)
diff --git
a/artemis-core-client/src/main/java/org/apache/activemq/artemis/api/core/Message.java
b/artemis-core-client/src/main/java/org/apache/activemq/artemis/api/core/Message.java
index 7bf8fbfbd7..0c87c08d9d 100644
---
a/artemis-core-client/src/main/java/org/apache/activemq/artemis/api/core/Message.java
+++
b/artemis-core-client/src/main/java/org/apache/activemq/artemis/api/core/Message.java
@@ -801,6 +801,9 @@ public interface Message {
int durableDown();
+ default void routed() {
+ }
+
/**
* {@return Returns the message in Map form, useful when encoding to JSON}
*/
diff --git
a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/broker/AMQPLargeMessage.java
b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/broker/AMQPLargeMessage.java
index c43fe56bc6..7aca908ad0 100644
---
a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/broker/AMQPLargeMessage.java
+++
b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/broker/AMQPLargeMessage.java
@@ -601,7 +601,7 @@ public class AMQPLargeMessage extends AMQPMessage
implements LargeServerMessage
@Override
- public int getMemoryEstimate() {
+ public synchronized int getMemoryEstimate() {
if (memoryEstimate == -1) {
memoryEstimate = memoryOffset * 2 + (extraProperties != null ?
extraProperties.getEncodeSize() : 0);
originalEstimate = memoryEstimate;
diff --git
a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/broker/AMQPMessage.java
b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/broker/AMQPMessage.java
index dc46ae80ba..e07c38e60b 100644
---
a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/broker/AMQPMessage.java
+++
b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/broker/AMQPMessage.java
@@ -211,6 +211,12 @@ public abstract class AMQPMessage extends RefCountMessage
implements org.apache.
protected byte priority = DEFAULT_MESSAGE_PRIORITY;
protected boolean isPaged;
+ protected volatile boolean routed = false;
+
+ @Override
+ public void routed() {
+ this.routed = true;
+ }
// The Proton based AMQP message section that are retained in memory, these
are the
// mutable portions of the Message as the broker sees it, although AMQP
defines that
@@ -537,7 +543,8 @@ public abstract class AMQPMessage extends RefCountMessage
implements org.apache.
return
lazyDecodeApplicationProperties(getData().duplicate().position(0));
}
- protected ApplicationProperties
lazyDecodeApplicationProperties(ReadableBuffer data) {
+ // need to synchronize access to lazyDecodeApplicationProperties to avoid
clashes with getMemoryEstimate
+ protected synchronized ApplicationProperties
lazyDecodeApplicationProperties(ReadableBuffer data) {
if (applicationProperties == null && applicationPropertiesPosition !=
VALUE_NOT_PRESENT) {
applicationProperties = scanForMessageSection(data,
applicationPropertiesPosition, ApplicationProperties.class);
if (owner != null && memoryEstimate != -1) {
@@ -546,7 +553,9 @@ public abstract class AMQPMessage extends RefCountMessage
implements org.apache.
// it is difficult to track the updates for paged messages
// for that reason we won't do it if paged
- if (!isPaged) {
+ // we also only do the update if the message was previously routed
+ // so if a debug method or an interceptor changed the size before
routing we would get a different size
+ if (!isPaged && routed) {
((PagingStore) owner).addSize(addition, false);
final int updatedEstimate = memoryEstimate + addition;
memoryEstimate = updatedEstimate;
diff --git
a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/broker/AMQPStandardMessage.java
b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/broker/AMQPStandardMessage.java
index c5f2e14fe5..c514009bce 100644
---
a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/broker/AMQPStandardMessage.java
+++
b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/broker/AMQPStandardMessage.java
@@ -190,7 +190,7 @@ public class AMQPStandardMessage extends AMQPMessage {
}
@Override
- public int getMemoryEstimate() {
+ public synchronized int getMemoryEstimate() {
if (memoryEstimate == -1) {
if (isPaged) {
// When the message is paged, we don't take the unmarshalled
application properties because it could be
diff --git
a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/QueueImpl.java
b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/QueueImpl.java
index 2f077a0ac0..8acf1f755d 100644
---
a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/QueueImpl.java
+++
b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/QueueImpl.java
@@ -744,6 +744,7 @@ public class QueueImpl extends CriticalComponentImpl
implements Queue {
if (count == 1) {
if (owner != null) {
owner.addSize(messageReference.getMessageMemoryEstimate(), false);
+ messageReference.getMessage().routed();
}
}
if (pagingStore != null) {
diff --git
a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/openwire/amq/ValidateAddressSizeTest.java
b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/openwire/amq/ValidateAddressSizeTest.java
new file mode 100644
index 0000000000..6e1b096f00
--- /dev/null
+++
b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/openwire/amq/ValidateAddressSizeTest.java
@@ -0,0 +1,212 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.artemis.tests.integration.openwire.amq;
+
+import javax.jms.Connection;
+import javax.jms.ConnectionFactory;
+import javax.jms.MessageConsumer;
+import javax.jms.MessageProducer;
+import javax.jms.Session;
+import javax.jms.TextMessage;
+import javax.jms.Topic;
+
+import java.lang.invoke.MethodHandles;
+import java.util.Set;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.CyclicBarrier;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicInteger;
+
+import org.apache.activemq.ActiveMQSession;
+import org.apache.activemq.artemis.api.core.ActiveMQException;
+import org.apache.activemq.artemis.api.core.Message;
+import org.apache.activemq.artemis.api.core.RoutingType;
+import org.apache.activemq.artemis.api.core.SimpleString;
+import org.apache.activemq.artemis.api.core.TransportConfiguration;
+import org.apache.activemq.artemis.core.config.Configuration;
+import org.apache.activemq.artemis.core.paging.impl.PagingStoreImpl;
+import org.apache.activemq.artemis.core.server.RoutingContext;
+import org.apache.activemq.artemis.core.server.impl.AddressInfo;
+import
org.apache.activemq.artemis.core.server.plugin.ActiveMQServerMessagePlugin;
+import
org.apache.activemq.artemis.tests.integration.openwire.BasicOpenWireTest;
+import org.apache.activemq.artemis.tests.util.CFUtil;
+import org.apache.activemq.artemis.tests.util.Wait;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertNotNull;
+import static org.junit.jupiter.api.Assertions.assertTrue;
+
+public class ValidateAddressSizeTest extends BasicOpenWireTest {
+
+ private static final Logger logger =
LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());
+
+ @BeforeEach
+ @Override
+ public void setUp() throws Exception {
+ // making data persistent makes it easier to debug it with print-data
+ this.realStore = true;
+ super.setUp();
+ }
+
+
+ @Override
+ protected void extraServerConfig(Configuration serverConfig) {
+ Set<TransportConfiguration> acceptors =
serverConfig.getAcceptorConfigurations();
+ for (TransportConfiguration tc : acceptors) {
+ if (tc.getName().equals("netty")) {
+ tc.getExtraParams().put("virtualTopicConsumerWildcards",
"Consumer.*.>;2,C.*.>;2;selectorAware=true");
+ }
+ }
+ }
+
+ @Test
+ public void testValidateSizeAfterConsumption() throws Exception {
+ internalTest(false);
+ }
+
+ @Test
+ public void testValidateSizeChangeMessageEstimate() throws Exception {
+ internalTest(true);
+ }
+
+ private void internalTest(boolean changeWithPlugin) throws Exception {
+ ConnectionFactory amqpCF = CFUtil.createConnectionFactory("AMQP",
"tcp://localhost:61616");
+ ConnectionFactory owCF = CFUtil.createConnectionFactory("OPENWIRE",
"tcp://localhost:61616");
+
+ String topicName = "topicTest";
+
+ server.addAddressInfo(new
AddressInfo(topicName).addRoutingType(RoutingType.MULTICAST));
+
+ int messageBodySize = 100;
+ int largeMessageSize = 101 * 1024;
+ String messageBody = "a".repeat(messageBodySize);
+ String largeMessageBody = "a".repeat(largeMessageSize);
+
+ int consumers = 10;
+ int numberOfMessages = 10;
+ int numberOfLargeMessages = 5;
+
+ ExecutorService executorService =
Executors.newFixedThreadPool(consumers);
+ runAfter(executorService::shutdownNow);
+
+ CyclicBarrier startFlag = new CyclicBarrier(consumers + 1);
+
+ String endbody = "EndNow";
+
+ CountDownLatch done = new CountDownLatch(consumers);
+
+ AtomicInteger errors = new AtomicInteger(0);
+
+ AtomicBoolean running = new AtomicBoolean(true);
+ runAfter(() -> running.set(false));
+
+ for (int i = 1; i <= consumers; i++) {
+ final int consumerID = i;
+ executorService.execute(() -> {
+ try (Connection owConnection = owCF.createConnection()) {
+ owConnection.setClientID("ow" + consumerID);
+ Session owSession = owConnection.createSession(false,
ActiveMQSession.CLIENT_ACKNOWLEDGE);
+ owConnection.start();
+ MessageConsumer consumer;
+ Topic topic = owSession.createTopic(topicName);
+ consumer = owSession.createDurableSubscriber(topic, "cons_" +
consumerID);
+ startFlag.await(10, TimeUnit.SECONDS);
+ while (running.get()) {
+ TextMessage message = (TextMessage) consumer.receive(1000);
+ if (message != null) {
+ message.acknowledge();
+ if (message.getText().equals(endbody)) {
+ break;
+ }
+ }
+ }
+ } catch (Throwable e) {
+ logger.warn(e.getMessage(), e);
+ errors.incrementAndGet();
+ } finally {
+ done.countDown();
+ }
+ });
+ }
+
+ // aligning everybody after consumers are created
+ startFlag.await(10, TimeUnit.SECONDS);
+
+ PagingStoreImpl store = (PagingStoreImpl)
server.getPagingManager().getPageStore(SimpleString.of(topicName));
+ assertNotNull(store);
+
+ if (changeWithPlugin) {
+ server.getBrokerMessagePlugins().add(new
ActiveMQServerMessagePlugin() {
+ @Override
+ public void beforeMessageRoute(Message message,
+ RoutingContext context,
+ boolean direct,
+ boolean rejectDuplicates) throws
ActiveMQException {
+ // this is introducing a race that could happen
+ message.getMemoryEstimate();
+ message.setOwner(store);
+ // to force lazy decode
+ message.getPropertyNames();
+ // it's meant to force a toString, what would happen on a debug
message
+ logger.debug("message {}", String.valueOf(message));
+ }
+ });
+ }
+
+ try (Connection amqpConnection = amqpCF.createConnection()) {
+ Session amqpSession = amqpConnection.createSession(true,
Session.SESSION_TRANSACTED);
+ MessageProducer producer =
amqpSession.createProducer(amqpSession.createTopic(topicName));
+ for (int i = 0; i < numberOfMessages + numberOfLargeMessages; i++) {
+ TextMessage message;
+ if (i < numberOfMessages) {
+ logger.info("message {}", i);
+ message = amqpSession.createTextMessage(messageBody);
+ } else {
+ logger.info("largemessage {}", i);
+ message = amqpSession.createTextMessage(largeMessageBody);
+ }
+ message.setIntProperty("i", i);
+ message.setStringProperty("myvalue", "2");
+ producer.send(message);
+ }
+ TextMessage endMessage = amqpSession.createTextMessage(endbody);
+ endMessage.setStringProperty("end", "theEnd");
+ endMessage.setStringProperty("myvalue", "2");
+ producer.send(endMessage);
+ amqpSession.commit();
+ }
+
+ assertTrue(done.await(1, TimeUnit.MINUTES));
+ assertEquals(0, errors.get());
+ running.set(false);
+
+ try {
+ Wait.assertEquals(0L, store::getAddressSize, 5000, 100);
+ } catch (Throwable e) {
+ logger.warn("error -> {}", e.getMessage(), e);
+ throw e;
+ }
+
+ }
+}
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]
For further information, visit: https://activemq.apache.org/contact