Fixing Scheduled Message
Project: http://git-wip-us.apache.org/repos/asf/activemq-artemis/repo Commit: http://git-wip-us.apache.org/repos/asf/activemq-artemis/commit/ef95eaa8 Tree: http://git-wip-us.apache.org/repos/asf/activemq-artemis/tree/ef95eaa8 Diff: http://git-wip-us.apache.org/repos/asf/activemq-artemis/diff/ef95eaa8 Branch: refs/heads/artemis-1009 Commit: ef95eaa848bb8d73271d60c2030be336c3862713 Parents: 4246128 Author: Clebert Suconic <clebertsuco...@apache.org> Authored: Wed Mar 1 17:43:51 2017 -0500 Committer: Clebert Suconic <clebertsuco...@apache.org> Committed: Thu Mar 2 20:04:30 2017 -0500 ---------------------------------------------------------------------- .../activemq/artemis/api/core/Message.java | 3 + .../artemis/core/message/impl/CoreMessage.java | 11 ++++ .../protocol/amqp/broker/AMQPMessage.java | 69 ++++++++++++-------- .../core/postoffice/impl/PostOfficeImpl.java | 18 +++-- .../management/impl/ManagementServiceImpl.java | 3 +- .../impl/ScheduledDeliveryHandlerTest.java | 9 ++- .../tests/integration/amqp/ProtonTest.java | 49 +++++++------- .../integration/client/AcknowledgeTest.java | 5 ++ 8 files changed, 99 insertions(+), 68 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/ef95eaa8/artemis-core-client/src/main/java/org/apache/activemq/artemis/api/core/Message.java ---------------------------------------------------------------------- diff --git a/artemis-core-client/src/main/java/org/apache/activemq/artemis/api/core/Message.java b/artemis-core-client/src/main/java/org/apache/activemq/artemis/api/core/Message.java index d96f232..b08202d 100644 --- a/artemis-core-client/src/main/java/org/apache/activemq/artemis/api/core/Message.java +++ b/artemis-core-client/src/main/java/org/apache/activemq/artemis/api/core/Message.java @@ -169,6 +169,9 @@ public interface Message { void messageChanged(); + /** Used to calculate what is the delivery time. + * Return null if not scheduled. */ + Long getScheduledDeliveryTime(); /** Used for Large messages on Core. * Do not use this, it will go away http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/ef95eaa8/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/message/impl/CoreMessage.java ---------------------------------------------------------------------- diff --git a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/message/impl/CoreMessage.java b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/message/impl/CoreMessage.java index 513b758..f620a1d 100644 --- a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/message/impl/CoreMessage.java +++ b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/message/impl/CoreMessage.java @@ -153,6 +153,17 @@ public class CoreMessage extends RefCountMessage { } } + @Override + public Long getScheduledDeliveryTime() { + Object property = getObjectProperty(Message.HDR_SCHEDULED_DELIVERY_TIME); + + if (property != null && property instanceof Number) { + return ((Number) property).longValue(); + } + + return null; + } + /** * {@inheritDoc} */ http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/ef95eaa8/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/broker/AMQPMessage.java ---------------------------------------------------------------------- diff --git a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/broker/AMQPMessage.java b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/broker/AMQPMessage.java index d39bf9d..772f2cd 100644 --- a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/broker/AMQPMessage.java +++ b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/broker/AMQPMessage.java @@ -37,6 +37,7 @@ import org.apache.activemq.artemis.core.persistence.Persister; import org.apache.activemq.artemis.protocol.amqp.util.NettyWritable; import org.apache.activemq.artemis.protocol.amqp.util.TLSEncode; import org.apache.activemq.artemis.utils.DataConstants; +import org.apache.qpid.proton.amqp.Symbol; import org.apache.qpid.proton.amqp.UnsignedInteger; import org.apache.qpid.proton.amqp.messaging.ApplicationProperties; import org.apache.qpid.proton.amqp.messaging.DeliveryAnnotations; @@ -63,6 +64,7 @@ public class AMQPMessage extends RefCountMessage { private long expiration = 0; // this can be used to encode the header again and the rest of the message buffer private int headerEnd = -1; + private boolean parsedHeaders = false; private Header _header; private DeliveryAnnotations _deliveryAnnotations; private MessageAnnotations _messageAnnotations; @@ -142,39 +144,55 @@ public class AMQPMessage extends RefCountMessage { } private ApplicationProperties getApplicationProperties() { - if (applicationProperties == null) { - if (data != null) { - partialDecode(data.nioBuffer(), true); - } else { - initalizeObjects(); - } - } - + parseHeaders(); return applicationProperties; } - public Header getHeader() { - if (_header == null) { + private void parseHeaders() { + if (!parsedHeaders) { if (data == null) { initalizeObjects(); } else { - partialDecode(this.data.nioBuffer(), false); + partialDecode(data.nioBuffer()); } + parsedHeaders = true; } + } + public MessageAnnotations getMessageAnnotations() { + parseHeaders(); + return _messageAnnotations; + } + + public Header getHeader() { + parseHeaders(); return _header; } public Properties getProperties() { - if (_properties == null) { - if (data == null) { - initalizeObjects(); - } else { - partialDecode(this.data.nioBuffer(), true); - } + parseHeaders(); + return _properties; + } + + private Object getSymbol(String symbol) { + MessageAnnotations annotations = getMessageAnnotations(); + Map mapAnnotations = annotations != null ? annotations.getValue() : null; + if (mapAnnotations != null) { + return mapAnnotations.get(Symbol.getSymbol("x-opt-delivery-time")); } - return _properties; + return null; + } + + @Override + public Long getScheduledDeliveryTime() { + + Object scheduledTime = getSymbol("x-opt-delivery-time"); + if (scheduledTime != null && scheduledTime instanceof Number) { + return ((Number)scheduledTime).longValue(); + } + + return null; } @Override @@ -182,7 +200,7 @@ public class AMQPMessage extends RefCountMessage { return AMQPMessagePersister.getInstance(); } - private synchronized void partialDecode(ByteBuffer buffer, boolean readApplicationProperties) { + private synchronized void partialDecode(ByteBuffer buffer) { DecoderImpl decoder = TLSEncode.getDecoder(); decoder.setByteBuffer(buffer); buffer.position(0); @@ -207,11 +225,7 @@ public class AMQPMessage extends RefCountMessage { this.expiration = System.currentTimeMillis() + _header.getTtl().intValue(); } - if (!readApplicationProperties) { - return; - } - - if (buffer.hasRemaining() && readApplicationProperties) { + if (buffer.hasRemaining()) { section = (Section) decoder.readObject(); } else { section = null; @@ -220,10 +234,6 @@ public class AMQPMessage extends RefCountMessage { // meaning there is no header headerEnd = 0; } - - if (!readApplicationProperties) { - return; - } if (section instanceof DeliveryAnnotations) { _deliveryAnnotations = (DeliveryAnnotations) section; @@ -254,6 +264,7 @@ public class AMQPMessage extends RefCountMessage { } } + if (section instanceof ApplicationProperties) { applicationProperties = (ApplicationProperties) section; } @@ -785,7 +796,7 @@ public class AMQPMessage extends RefCountMessage { @Override public org.apache.activemq.artemis.api.core.Message toCore() { MessageImpl protonMessage = getProtonMessage(); - return null; + throw new IllegalStateException("conversion between AMQP and Core not implemented yet!"); } @Override http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/ef95eaa8/artemis-server/src/main/java/org/apache/activemq/artemis/core/postoffice/impl/PostOfficeImpl.java ---------------------------------------------------------------------- diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/postoffice/impl/PostOfficeImpl.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/postoffice/impl/PostOfficeImpl.java index 81a83ac..d23185e 100644 --- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/postoffice/impl/PostOfficeImpl.java +++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/postoffice/impl/PostOfficeImpl.java @@ -1079,6 +1079,8 @@ public class PostOfficeImpl implements PostOffice, NotificationListener, Binding Transaction tx = context.getTransaction(); + Long deliveryTime = message.getScheduledDeliveryTime(); + for (Map.Entry<SimpleString, RouteContextList> entry : context.getContexListing().entrySet()) { PagingStore store = pagingManager.getPageStore(entry.getKey()); @@ -1095,12 +1097,10 @@ public class PostOfficeImpl implements PostOffice, NotificationListener, Binding for (Queue queue : entry.getValue().getNonDurableQueues()) { MessageReference reference = MessageReference.Factory.createReference(message, queue); - refs.add(reference); - if (message.containsProperty(Message.HDR_SCHEDULED_DELIVERY_TIME)) { - Long scheduledDeliveryTime = message.getLongProperty(Message.HDR_SCHEDULED_DELIVERY_TIME); - - reference.setScheduledDeliveryTime(scheduledDeliveryTime); + if (deliveryTime != null) { + reference.setScheduledDeliveryTime(deliveryTime); } + refs.add(reference); message.incrementRefCount(); } @@ -1119,13 +1119,11 @@ public class PostOfficeImpl implements PostOffice, NotificationListener, Binding } } - refs.add(reference); - - if (message.containsProperty(Message.HDR_SCHEDULED_DELIVERY_TIME)) { - Long scheduledDeliveryTime = message.getLongProperty(Message.HDR_SCHEDULED_DELIVERY_TIME); - reference.setScheduledDeliveryTime(scheduledDeliveryTime); + if (deliveryTime != null) { + reference.setScheduledDeliveryTime(deliveryTime); } + refs.add(reference); if (message.isDurable()) { int durableRefCount = message.incrementDurableRefCount(); http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/ef95eaa8/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/management/impl/ManagementServiceImpl.java ---------------------------------------------------------------------- diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/management/impl/ManagementServiceImpl.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/management/impl/ManagementServiceImpl.java index 002b2c7..5b2bf28 100644 --- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/management/impl/ManagementServiceImpl.java +++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/management/impl/ManagementServiceImpl.java @@ -365,7 +365,8 @@ public class ManagementServiceImpl implements ManagementService { } @Override - public Message handleMessage(final Message message) throws Exception { + public Message handleMessage(Message message) throws Exception { + message = message.toCore(); // a reply message is sent with the result stored in the message body. Message reply = new CoreMessage(storageManager.generateID(), 512); http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/ef95eaa8/artemis-server/src/test/java/org/apache/activemq/artemis/core/server/impl/ScheduledDeliveryHandlerTest.java ---------------------------------------------------------------------- diff --git a/artemis-server/src/test/java/org/apache/activemq/artemis/core/server/impl/ScheduledDeliveryHandlerTest.java b/artemis-server/src/test/java/org/apache/activemq/artemis/core/server/impl/ScheduledDeliveryHandlerTest.java index 4da2e63..5b44572 100644 --- a/artemis-server/src/test/java/org/apache/activemq/artemis/core/server/impl/ScheduledDeliveryHandlerTest.java +++ b/artemis-server/src/test/java/org/apache/activemq/artemis/core/server/impl/ScheduledDeliveryHandlerTest.java @@ -35,8 +35,8 @@ import org.apache.activemq.artemis.api.core.ActiveMQBuffer; import org.apache.activemq.artemis.api.core.ActiveMQException; import org.apache.activemq.artemis.api.core.ActiveMQPropertyConversionException; import org.apache.activemq.artemis.api.core.Message; - import org.apache.activemq.artemis.api.core.RefCountMessage; +import org.apache.activemq.artemis.api.core.RoutingType; import org.apache.activemq.artemis.api.core.SimpleString; import org.apache.activemq.artemis.api.core.encode.BodyType; import org.apache.activemq.artemis.core.filter.Filter; @@ -47,8 +47,6 @@ import org.apache.activemq.artemis.core.server.Consumer; import org.apache.activemq.artemis.core.server.MessageReference; import org.apache.activemq.artemis.core.server.Queue; import org.apache.activemq.artemis.core.server.RoutingContext; -import org.apache.activemq.artemis.api.core.RoutingType; - import org.apache.activemq.artemis.core.transaction.Transaction; import org.apache.activemq.artemis.utils.ActiveMQThreadFactory; import org.apache.activemq.artemis.utils.LinkedListIterator; @@ -293,6 +291,11 @@ public class ScheduledDeliveryHandlerTest extends Assert { } @Override + public Long getScheduledDeliveryTime() { + return null; + } + + @Override public void reloadPersistence(ActiveMQBuffer record) { } http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/ef95eaa8/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/ProtonTest.java ---------------------------------------------------------------------- diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/ProtonTest.java b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/ProtonTest.java index aa1bdc4..4640c33 100644 --- a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/ProtonTest.java +++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/ProtonTest.java @@ -16,28 +16,6 @@ */ package org.apache.activemq.artemis.tests.integration.amqp; -import static org.apache.activemq.artemis.protocol.amqp.proton.AmqpSupport.DELAYED_DELIVERY; -import static org.apache.activemq.artemis.protocol.amqp.proton.AmqpSupport.PRODUCT; -import static org.apache.activemq.artemis.protocol.amqp.proton.AmqpSupport.VERSION; -import static org.apache.activemq.artemis.protocol.amqp.proton.AmqpSupport.contains; - -import java.io.IOException; -import java.io.Serializable; -import java.lang.reflect.Field; -import java.net.URI; -import java.util.ArrayList; -import java.util.Arrays; -import java.util.Collection; -import java.util.Collections; -import java.util.Enumeration; -import java.util.LinkedHashMap; -import java.util.Map; -import java.util.Optional; -import java.util.Random; -import java.util.concurrent.CountDownLatch; -import java.util.concurrent.TimeUnit; -import java.util.concurrent.atomic.AtomicInteger; - import javax.jms.BytesMessage; import javax.jms.Connection; import javax.jms.ConnectionFactory; @@ -61,7 +39,24 @@ import javax.jms.Topic; import javax.jms.TopicPublisher; import javax.jms.TopicSession; import javax.jms.TopicSubscriber; +import java.io.IOException; +import java.io.Serializable; +import java.lang.reflect.Field; +import java.net.URI; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.Collection; +import java.util.Collections; +import java.util.Enumeration; +import java.util.LinkedHashMap; +import java.util.Map; +import java.util.Optional; +import java.util.Random; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicInteger; +import org.apache.activemq.artemis.api.core.RoutingType; import org.apache.activemq.artemis.api.core.SimpleString; import org.apache.activemq.artemis.api.core.management.ResourceNames; import org.apache.activemq.artemis.core.config.Configuration; @@ -71,14 +66,13 @@ import org.apache.activemq.artemis.core.remoting.impl.netty.NettyConnector; import org.apache.activemq.artemis.core.remoting.impl.netty.TransportConstants; import org.apache.activemq.artemis.core.server.ActiveMQServer; import org.apache.activemq.artemis.core.server.Queue; -import org.apache.activemq.artemis.api.core.RoutingType; import org.apache.activemq.artemis.core.server.impl.AddressInfo; import org.apache.activemq.artemis.core.settings.impl.AddressFullMessagePolicy; import org.apache.activemq.artemis.core.settings.impl.AddressSettings; +import org.apache.activemq.artemis.protocol.amqp.broker.ProtonProtocolManagerFactory; import org.apache.activemq.artemis.protocol.amqp.client.AMQPClientConnectionFactory; import org.apache.activemq.artemis.protocol.amqp.client.ProtonClientConnectionManager; import org.apache.activemq.artemis.protocol.amqp.client.ProtonClientProtocolManager; -import org.apache.activemq.artemis.protocol.amqp.broker.ProtonProtocolManagerFactory; import org.apache.activemq.artemis.protocol.amqp.proton.AmqpSupport; import org.apache.activemq.artemis.protocol.amqp.proton.ProtonServerReceiverContext; import org.apache.activemq.artemis.spi.core.protocol.RemotingConnection; @@ -105,6 +99,11 @@ import org.junit.Test; import org.junit.runner.RunWith; import org.junit.runners.Parameterized; +import static org.apache.activemq.artemis.protocol.amqp.proton.AmqpSupport.DELAYED_DELIVERY; +import static org.apache.activemq.artemis.protocol.amqp.proton.AmqpSupport.PRODUCT; +import static org.apache.activemq.artemis.protocol.amqp.proton.AmqpSupport.VERSION; +import static org.apache.activemq.artemis.protocol.amqp.proton.AmqpSupport.contains; + @RunWith(Parameterized.class) public class ProtonTest extends ProtonTestBase { @@ -379,7 +378,7 @@ public class ProtonTest extends ProtonTestBase { receiver.flow(1); // Shouldn't get this since we delayed the message. - assertNull(receiver.receive(5, TimeUnit.SECONDS)); + assertNull(receiver.receive(1, TimeUnit.SECONDS)); } finally { connection.close(); } http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/ef95eaa8/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/client/AcknowledgeTest.java ---------------------------------------------------------------------- diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/client/AcknowledgeTest.java b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/client/AcknowledgeTest.java index 442d6e9..40f2ebd 100644 --- a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/client/AcknowledgeTest.java +++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/client/AcknowledgeTest.java @@ -364,6 +364,11 @@ public class AcknowledgeTest extends ActiveMQTestBase { } @Override + public Long getScheduledDeliveryTime() { + return null; + } + + @Override public Message toCore() { return this; }