This is an automated email from the ASF dual-hosted git repository.
jbertram pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/activemq-artemis.git
The following commit(s) were added to refs/heads/main by this push:
new fb4c68681e ARTEMIS-4095: fix delivering message size accounting
fb4c68681e is described below
commit fb4c68681edde5225980b1acf0935af124d0b766
Author: Artyom Tarasenko <[email protected]>
AuthorDate: Fri Jun 23 17:55:26 2023 +0200
ARTEMIS-4095: fix delivering message size accounting
Signed-off-by: Artyom Tarasenko <[email protected]>
---
.../core/protocol/openwire/amq/AMQConsumer.java | 4 +---
.../protocol/openwire/amq/AMQConsumerTest.java | 27 ++++++++++++++++++++++
2 files changed, 28 insertions(+), 3 deletions(-)
diff --git
a/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/amq/AMQConsumer.java
b/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/amq/AMQConsumer.java
index a98369f48e..3165b14faf 100644
---
a/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/amq/AMQConsumer.java
+++
b/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/amq/AMQConsumer.java
@@ -288,9 +288,7 @@ public class AMQConsumer {
return 0;
}
- if (session.getConnection().isNoLocal() || session.isInternal()) {
- //internal session always delivers messages to noLocal advisory
consumers
- //so we need to remove this property too.
+ if (session.getConnection().isNoLocal() || (session.isInternal() &&
AdvisorySupport.isAdvisoryTopic(openwireDestination))) {
message.removeProperty(MessageUtil.CONNECTION_ID_PROPERTY_NAME);
}
//handleDeliver is performed by an executor (see JBPAPP-6030): any
AMQConsumer can share the session.wireFormat()
diff --git
a/artemis-protocols/artemis-openwire-protocol/src/test/java/org/apache/activemq/artemis/core/protocol/openwire/amq/AMQConsumerTest.java
b/artemis-protocols/artemis-openwire-protocol/src/test/java/org/apache/activemq/artemis/core/protocol/openwire/amq/AMQConsumerTest.java
index 18dcfc055f..7992c4f040 100644
---
a/artemis-protocols/artemis-openwire-protocol/src/test/java/org/apache/activemq/artemis/core/protocol/openwire/amq/AMQConsumerTest.java
+++
b/artemis-protocols/artemis-openwire-protocol/src/test/java/org/apache/activemq/artemis/core/protocol/openwire/amq/AMQConsumerTest.java
@@ -16,29 +16,55 @@
*/
package org.apache.activemq.artemis.core.protocol.openwire.amq;
+import static org.junit.Assert.assertEquals;
+
import java.util.concurrent.ScheduledExecutorService;
import org.apache.activemq.artemis.api.core.ICoreMessage;
+import org.apache.activemq.artemis.api.core.Message;
import org.apache.activemq.artemis.api.core.SimpleString;
import org.apache.activemq.artemis.core.message.impl.CoreMessage;
import org.apache.activemq.artemis.core.protocol.openwire.OpenWireConnection;
+import
org.apache.activemq.artemis.core.protocol.openwire.OpenWireMessageConverter;
import org.apache.activemq.artemis.core.server.ActiveMQServer;
import org.apache.activemq.artemis.core.server.MessageReference;
import org.apache.activemq.artemis.core.server.NodeManager;
+import org.apache.activemq.artemis.core.server.Queue;
import org.apache.activemq.artemis.core.server.ServerSession;
import org.apache.activemq.artemis.core.server.SlowConsumerDetectionListener;
+import org.apache.activemq.artemis.core.server.impl.MessageReferenceImpl;
import org.apache.activemq.artemis.core.server.impl.ServerConsumerImpl;
+import org.apache.activemq.artemis.reader.MessageUtil;
import org.apache.activemq.artemis.utils.UUID;
import org.apache.activemq.artemis.utils.UUIDGenerator;
import org.apache.activemq.command.ActiveMQDestination;
+import org.apache.activemq.command.ActiveMQMessage;
import org.apache.activemq.command.ActiveMQTopic;
import org.apache.activemq.command.ConsumerInfo;
+import org.apache.activemq.openwire.OpenWireFormatFactory;
+import org.apache.activemq.wireformat.WireFormat;
import org.junit.Assert;
import org.junit.Test;
import org.mockito.ArgumentMatchers;
import org.mockito.Mockito;
public class AMQConsumerTest {
+ final OpenWireFormatFactory formatFactory = new OpenWireFormatFactory();
+ final WireFormat openWireFormat = formatFactory.createWireFormat();
+
+ @Test
+ public void testClientId() throws Exception {
+ final String CID_ID = "client-12345-6789012345678-0:-1";
+
+ ActiveMQMessage classicMessage = new ActiveMQMessage();
+
classicMessage.setProperty(MessageUtil.CONNECTION_ID_PROPERTY_NAME_STRING,
CID_ID);
+ Message artemisMessage =
OpenWireMessageConverter.inbound(classicMessage.getMessage(), openWireFormat,
null);
+ assertEquals(CID_ID,
artemisMessage.getStringProperty(MessageUtil.CONNECTION_ID_PROPERTY_NAME_STRING));
+ MessageReference messageReference = new
MessageReferenceImpl(artemisMessage, Mockito.mock(Queue.class));
+ AMQConsumer amqConsumer = getConsumer(0);
+ amqConsumer.handleDeliver(messageReference, (ICoreMessage)
artemisMessage);
+ assertEquals(CID_ID,
artemisMessage.getStringProperty(MessageUtil.CONNECTION_ID_PROPERTY_NAME_STRING));
+ }
@Test
public void testCreditsWithPrefetch() throws Exception {
@@ -69,6 +95,7 @@ public class AMQConsumerTest {
ArgumentMatchers.anyBoolean(),
ArgumentMatchers.anyBoolean(),
ArgumentMatchers.nullable(Integer.class))).thenReturn(Mockito.mock(ServerConsumerImpl.class));
AMQSession session = Mockito.mock(AMQSession.class);
+ Mockito.when(session.isInternal()).thenReturn(true);
Mockito.when(session.getConnection()).thenReturn(Mockito.mock(OpenWireConnection.class));
Mockito.when(session.getCoreServer()).thenReturn(coreServer);
Mockito.when(session.getCoreSession()).thenReturn(coreSession);