This is an automated email from the ASF dual-hosted git repository.
jbertram pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/activemq-artemis.git
The following commit(s) were added to refs/heads/main by this push:
new c6f227cbb8 ARTEMIS-4820 Read Header TTL as unsigned integer to set
expiration
c6f227cbb8 is described below
commit c6f227cbb883469247dd7c2389493be579ad35a0
Author: Timothy Bish <[email protected]>
AuthorDate: Mon Jun 17 13:54:44 2024 -0400
ARTEMIS-4820 Read Header TTL as unsigned integer to set expiration
When setting expiration on the AMQPMessage the AMQP header TTL value
should be read as an unsigned integer and as such should use the longValue
API of UnsignedInteger to get the right value to set expiration.
---
.../artemis/protocol/amqp/broker/AMQPMessage.java | 2 +-
.../protocol/amqp/broker/AMQPMessageTest.java | 24 ++++++++++
.../integration/amqp/AmqpExpiredMessageTest.java | 55 ++++++++++++++++++++--
3 files changed, 76 insertions(+), 5 deletions(-)
diff --git
a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/broker/AMQPMessage.java
b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/broker/AMQPMessage.java
index 1490edc2ed..4c43401fac 100644
---
a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/broker/AMQPMessage.java
+++
b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/broker/AMQPMessage.java
@@ -705,7 +705,7 @@ public abstract class AMQPMessage extends RefCountMessage
implements org.apache.
encodedHeaderSize = data.position() - constructorPos;
if (header.getTtl() != null) {
if (!expirationReload) {
- expiration = System.currentTimeMillis() +
header.getTtl().intValue();
+ expiration = System.currentTimeMillis() +
header.getTtl().longValue();
}
}
} else if
(DeliveryAnnotations.class.equals(constructor.getTypeClass())) {
diff --git
a/artemis-protocols/artemis-amqp-protocol/src/test/java/org/apache/activemq/artemis/protocol/amqp/broker/AMQPMessageTest.java
b/artemis-protocols/artemis-amqp-protocol/src/test/java/org/apache/activemq/artemis/protocol/amqp/broker/AMQPMessageTest.java
index 8ed39125c2..3c917fe349 100644
---
a/artemis-protocols/artemis-amqp-protocol/src/test/java/org/apache/activemq/artemis/protocol/amqp/broker/AMQPMessageTest.java
+++
b/artemis-protocols/artemis-amqp-protocol/src/test/java/org/apache/activemq/artemis/protocol/amqp/broker/AMQPMessageTest.java
@@ -1027,6 +1027,18 @@ public class AMQPMessageTest {
assertTrue(decoded.getExpiration() > System.currentTimeMillis());
}
+ @Test
+ public void testGetExpirationFromMessageWithMaxUIntTTL() {
+ final long ttl = UnsignedInteger.MAX_VALUE.longValue();
+
+ MessageImpl protonMessage = (MessageImpl) Message.Factory.create();
+ protonMessage.setHeader(new Header());
+ protonMessage.setTtl(ttl);
+ AMQPStandardMessage decoded = encodeAndDecodeMessage(protonMessage);
+
+ assertTrue(decoded.getExpiration() > System.currentTimeMillis());
+ }
+
@Test
public void testGetExpirationFromCoreMessageUsingTTL() {
final long ttl = 100000;
@@ -1094,6 +1106,18 @@ public class AMQPMessageTest {
assertEquals(expirationTime.getTime(), decoded.getExpiration());
}
+ @Test
+ public void testSetExpirationMaxUInt() {
+ final Date expirationTime = new
Date(UnsignedInteger.MAX_VALUE.longValue());
+
+ MessageImpl protonMessage = (MessageImpl) Message.Factory.create();
+ AMQPStandardMessage decoded = encodeAndDecodeMessage(protonMessage);
+
+ assertEquals(0, decoded.getExpiration());
+ decoded.setExpiration(expirationTime.getTime());
+ assertEquals(expirationTime.getTime(), decoded.getExpiration());
+ }
+
@Test
public void testSetExpirationUpdatesProperties() {
final Date originalExpirationTime = new Date(System.currentTimeMillis());
diff --git
a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/AmqpExpiredMessageTest.java
b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/AmqpExpiredMessageTest.java
index 1429700209..604a9882da 100644
---
a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/AmqpExpiredMessageTest.java
+++
b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/AmqpExpiredMessageTest.java
@@ -53,6 +53,7 @@ import org.apache.activemq.transport.amqp.client.AmqpMessage;
import org.apache.activemq.transport.amqp.client.AmqpReceiver;
import org.apache.activemq.transport.amqp.client.AmqpSender;
import org.apache.activemq.transport.amqp.client.AmqpSession;
+import org.apache.qpid.proton.amqp.UnsignedInteger;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.Timeout;
import org.slf4j.Logger;
@@ -477,6 +478,55 @@ public class AmqpExpiredMessageTest extends
AmqpClientTestSupport {
connection.close();
}
+ @Test
+ @Timeout(60)
+ public void testSendMessageThatIsNotExpiredUsingTimeToLiveOfMaxUInt()
throws Exception {
+
doTestSendMessageThatIsNotExpiredUsingTimeToLive(UnsignedInteger.MAX_VALUE);
+ }
+
+ @Test
+ @Timeout(60)
+ public void testSendMessageThatIsNotExpiredUsingTimeToLiveOfMaxIntValue()
throws Exception {
+
doTestSendMessageThatIsNotExpiredUsingTimeToLive(UnsignedInteger.valueOf(Integer.MAX_VALUE));
+ }
+
+ @Test
+ @Timeout(60)
+ public void testSendMessageThatIsNotExpiredUsingTimeToLiveOfMinusOne()
throws Exception {
+
doTestSendMessageThatIsNotExpiredUsingTimeToLive(UnsignedInteger.valueOf(-1));
+ }
+
+ private void
doTestSendMessageThatIsNotExpiredUsingTimeToLive(UnsignedInteger ttl) throws
Exception {
+ AmqpClient client = createAmqpClient();
+ AmqpConnection connection = addConnection(client.connect());
+ AmqpSession session = connection.createSession();
+
+ AmqpSender sender = session.createSender(getQueueName());
+
+ // Get the Queue View early to avoid racing the delivery.
+ final Queue queueView = getProxyToQueue(getQueueName());
+ assertNotNull(queueView);
+
+ AmqpMessage message = new AmqpMessage();
+ message.setTimeToLive(ttl.longValue());
+ message.setText("Test-Message");
+ sender.send(message);
+ sender.close();
+
+ Wait.assertEquals(1, queueView::getMessageCount);
+
+ // Now try and get the message
+ AmqpReceiver receiver = session.createReceiver(getQueueName());
+ receiver.flow(1);
+ AmqpMessage received = receiver.receive(5, TimeUnit.SECONDS);
+ assertNotNull(received, "Should have read message but it seems to have
timed out.");
+ assertEquals(ttl.longValue(), received.getTimeToLive());
+
+ Wait.assertEquals(0, queueView::getMessagesExpired);
+
+ connection.close();
+ }
+
@Test
@Timeout(60)
public void testSendMessageThenAllowToExpiredUsingTimeToLive() throws
Exception {
@@ -829,13 +879,10 @@ public class AmqpExpiredMessageTest extends
AmqpClientTestSupport {
MessageReference ref = linkedListIterator.next();
String idUsed = ref.getMessage().getStringProperty("id");
long originalExpiration = dataSet.get(idUsed);
- System.out.println("original Expiration = " + originalExpiration + "
while this expiration = " + ref.getMessage().getExpiration());
+ logger.info("original Expiration = {} while this expiration = {}",
originalExpiration, ref.getMessage().getExpiration());
assertEquals(originalExpiration, ref.getMessage().getExpiration());
}
assertEquals(2, count);
linkedListIterator.close();
-
-
}
-
}
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]
For further information, visit: https://activemq.apache.org/contact