This is an automated email from the ASF dual-hosted git repository. clebertsuconic pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/activemq-artemis.git
The following commit(s) were added to refs/heads/master by this push: new 9f4ac4c ARTEMIS-2214 Cache durable&deliveryTime in PagedReference new 9cbe451 This closes #2482 9f4ac4c is described below commit 9f4ac4cb9f99059214e4eb03a3579d53d5bb0e67 Author: Qihong Xu <qihon...@bu.edu> AuthorDate: Fri Jan 4 16:57:36 2019 +0800 ARTEMIS-2214 Cache durable&deliveryTime in PagedReference --- .../core/paging/cursor/PagedReferenceImpl.java | 19 ++++++++++++++++++- .../artemis/core/server/MessageReference.java | 2 ++ .../artemis/core/server/impl/LastValueQueue.java | 5 +++++ .../core/server/impl/MessageReferenceImpl.java | 5 +++++ .../activemq/artemis/core/server/impl/QueueImpl.java | 4 ++-- .../core/server/impl/QueuePendingMessageMetrics.java | 4 ++-- 6 files changed, 34 insertions(+), 5 deletions(-) diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/cursor/PagedReferenceImpl.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/cursor/PagedReferenceImpl.java index 893e3a7..0f265f8 100644 --- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/cursor/PagedReferenceImpl.java +++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/cursor/PagedReferenceImpl.java @@ -77,6 +77,12 @@ public class PagedReferenceImpl extends LinkedListImpl.Node<PagedReferenceImpl> private Consumer<? super MessageReference> onDelivery; + //Durable field : 0 is false, 1 is true, -1 not defined + private static final byte IS_NOT_DURABLE = 0; + private static final byte IS_DURABLE = 1; + private static final byte UNDEFINED_IS_DURABLE = -1; + private byte durable = UNDEFINED_IS_DURABLE; + @Override public Object getProtocolData() { return protocolData; @@ -144,7 +150,8 @@ public class PagedReferenceImpl extends LinkedListImpl.Node<PagedReferenceImpl> this.largeMessage = message.getMessage().isLargeMessage() ? IS_LARGE_MESSAGE : IS_NOT_LARGE_MESSAGE; this.transactionID = message.getTransactionID(); this.messageID = message.getMessage().getMessageID(); - + this.durable = message.getMessage().isDurable() ? IS_DURABLE : IS_NOT_DURABLE; + this.deliveryTime = message.getMessage().getScheduledDeliveryTime(); //pre-cache the message size so we don't have to reload the message later if it is GC'd getPersistentSize(); } else { @@ -152,6 +159,8 @@ public class PagedReferenceImpl extends LinkedListImpl.Node<PagedReferenceImpl> this.transactionID = -2; this.messageID = -1; this.messageSize = -1; + this.durable = UNDEFINED_IS_DURABLE; + this.deliveryTime = UNDEFINED_DELIVERY_TIME; } } @@ -387,4 +396,12 @@ public class PagedReferenceImpl extends LinkedListImpl.Node<PagedReferenceImpl> return messageSize; } + @Override + public boolean isDurable() { + if (durable == UNDEFINED_IS_DURABLE) { + durable = getMessage().isDurable() ? IS_DURABLE : IS_NOT_DURABLE; + } + return durable == IS_DURABLE; + } + } diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/MessageReference.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/MessageReference.java index 905f93d..2dd8fc3 100644 --- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/MessageReference.java +++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/MessageReference.java @@ -44,6 +44,8 @@ public interface MessageReference { long getMessageID(); + boolean isDurable(); + SimpleString getLastValueProperty(); /** diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/LastValueQueue.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/LastValueQueue.java index 0ebd7a8..28e3ee3 100644 --- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/LastValueQueue.java +++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/LastValueQueue.java @@ -306,6 +306,11 @@ public class LastValueQueue extends QueueImpl { } @Override + public boolean isDurable() { + return getMessage().isDurable(); + } + + @Override public SimpleString getLastValueProperty() { return prop; } diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/MessageReferenceImpl.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/MessageReferenceImpl.java index 12acffd..97bb7f6 100644 --- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/MessageReferenceImpl.java +++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/MessageReferenceImpl.java @@ -190,6 +190,11 @@ public class MessageReferenceImpl extends LinkedListImpl.Node<MessageReferenceIm } @Override + public boolean isDurable() { + return getMessage().isDurable(); + } + + @Override public void handled() { queue.referenceHandled(this); } diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/QueueImpl.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/QueueImpl.java index 1752eb2..6c430bb 100644 --- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/QueueImpl.java +++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/QueueImpl.java @@ -2767,7 +2767,7 @@ public class QueueImpl extends CriticalComponentImpl implements Queue { return true; } - if (!internalQueue && message.isDurable() && isDurableMessage() && !reference.isPaged()) { + if (!internalQueue && reference.isDurable() && isDurableMessage() && !reference.isPaged()) { storageManager.updateDeliveryCount(reference); } @@ -2796,7 +2796,7 @@ public class QueueImpl extends CriticalComponentImpl implements Queue { reference.setScheduledDeliveryTime(timeBase + redeliveryDelay); - if (!reference.isPaged() && message.isDurable() && isDurableMessage()) { + if (!reference.isPaged() && reference.isDurable() && isDurableMessage()) { storageManager.updateScheduledDeliveryTime(reference); } } diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/QueuePendingMessageMetrics.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/QueuePendingMessageMetrics.java index a657bae..668a918 100644 --- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/QueuePendingMessageMetrics.java +++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/QueuePendingMessageMetrics.java @@ -58,7 +58,7 @@ public class QueuePendingMessageMetrics { long size = getPersistentSize(reference); COUNT_UPDATER.incrementAndGet(this); SIZE_UPDATER.addAndGet(this, size); - if (queue.isDurable() && reference.getMessage().isDurable()) { + if (queue.isDurable() && reference.isDurable()) { DURABLE_COUNT_UPDATER.incrementAndGet(this); DURABLE_SIZE_UPDATER.addAndGet(this, size); } @@ -68,7 +68,7 @@ public class QueuePendingMessageMetrics { long size = -getPersistentSize(reference); COUNT_UPDATER.decrementAndGet(this); SIZE_UPDATER.addAndGet(this, size); - if (queue.isDurable() && reference.getMessage().isDurable()) { + if (queue.isDurable() && reference.isDurable()) { DURABLE_COUNT_UPDATER.decrementAndGet(this); DURABLE_SIZE_UPDATER.addAndGet(this, size); }