Repository: activemq Updated Branches: refs/heads/master b9f9f0382 -> b9b98a45c
https://issues.apache.org/jira/browse/AMQ-6256 Moving beforeMarshall call out of the store and into the actual destination Project: http://git-wip-us.apache.org/repos/asf/activemq/repo Commit: http://git-wip-us.apache.org/repos/asf/activemq/commit/b9b98a45 Tree: http://git-wip-us.apache.org/repos/asf/activemq/tree/b9b98a45 Diff: http://git-wip-us.apache.org/repos/asf/activemq/diff/b9b98a45 Branch: refs/heads/master Commit: b9b98a45cee484b112dadeffa2d9a874c4ffe280 Parents: b9f9f03 Author: Christopher L. Shannon (cshannon) <christopher.l.shan...@gmail.com> Authored: Fri Apr 15 14:17:00 2016 +0000 Committer: Christopher L. Shannon (cshannon) <christopher.l.shan...@gmail.com> Committed: Fri Apr 15 14:17:00 2016 +0000 ---------------------------------------------------------------------- .../src/main/java/org/apache/activemq/broker/region/Queue.java | 1 + .../src/main/java/org/apache/activemq/broker/region/Topic.java | 1 + .../main/java/org/apache/activemq/store/kahadb/KahaDBStore.java | 3 +-- .../src/main/scala/org/apache/activemq/leveldb/LevelDBStore.scala | 1 - 4 files changed, 3 insertions(+), 3 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/activemq/blob/b9b98a45/activemq-broker/src/main/java/org/apache/activemq/broker/region/Queue.java ---------------------------------------------------------------------- diff --git a/activemq-broker/src/main/java/org/apache/activemq/broker/region/Queue.java b/activemq-broker/src/main/java/org/apache/activemq/broker/region/Queue.java index f5cd80f..9578c7d 100755 --- a/activemq-broker/src/main/java/org/apache/activemq/broker/region/Queue.java +++ b/activemq-broker/src/main/java/org/apache/activemq/broker/region/Queue.java @@ -838,6 +838,7 @@ public class Queue extends BaseDestination implements Task, UsageListener, Index //condition if the original add is processed after the update, which can cause //a duplicate message to be stored if (messages.isCacheEnabled() && !isPersistJMSRedelivered()) { + message.beforeMarshall(null); result = store.asyncAddQueueMessage(context, message, isOptimizeStorage()); final PendingMarshalUsageTracker tracker = new PendingMarshalUsageTracker(message); result.addListener(new Runnable() { http://git-wip-us.apache.org/repos/asf/activemq/blob/b9b98a45/activemq-broker/src/main/java/org/apache/activemq/broker/region/Topic.java ---------------------------------------------------------------------- diff --git a/activemq-broker/src/main/java/org/apache/activemq/broker/region/Topic.java b/activemq-broker/src/main/java/org/apache/activemq/broker/region/Topic.java index 1a9949e..a13bcd5 100755 --- a/activemq-broker/src/main/java/org/apache/activemq/broker/region/Topic.java +++ b/activemq-broker/src/main/java/org/apache/activemq/broker/region/Topic.java @@ -511,6 +511,7 @@ public class Topic extends BaseDestination implements Task { waitForSpace(context,producerExchange, systemUsage.getStoreUsage(), getStoreUsageHighWaterMark(), logMessage); } + message.beforeMarshall(null); result = topicStore.asyncAddTopicMessage(context, message,isOptimizeStorage()); } http://git-wip-us.apache.org/repos/asf/activemq/blob/b9b98a45/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/KahaDBStore.java ---------------------------------------------------------------------- diff --git a/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/KahaDBStore.java b/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/KahaDBStore.java index 7f8283d..e1c1df4 100644 --- a/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/KahaDBStore.java +++ b/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/KahaDBStore.java @@ -66,6 +66,7 @@ import org.apache.activemq.store.PersistenceAdapter; import org.apache.activemq.store.TopicMessageStore; import org.apache.activemq.store.TransactionIdTransformer; import org.apache.activemq.store.TransactionStore; +import org.apache.activemq.store.kahadb.MessageDatabase.Metadata; import org.apache.activemq.store.kahadb.data.KahaAddMessageCommand; import org.apache.activemq.store.kahadb.data.KahaDestination; import org.apache.activemq.store.kahadb.data.KahaDestination.DestinationType; @@ -383,7 +384,6 @@ public class KahaDBStore extends MessageDatabase implements PersistenceAdapter { public ListenableFuture<Object> asyncAddQueueMessage(final ConnectionContext context, final Message message) throws IOException { if (isConcurrentStoreAndDispatchQueues()) { - message.beforeMarshall(wireFormat); StoreQueueTask result = new StoreQueueTask(this, context, message); ListenableFuture<Object> future = result.getFuture(); message.getMessageId().setFutureOrSequenceLong(future); @@ -754,7 +754,6 @@ public class KahaDBStore extends MessageDatabase implements PersistenceAdapter { public ListenableFuture<Object> asyncAddTopicMessage(final ConnectionContext context, final Message message) throws IOException { if (isConcurrentStoreAndDispatchTopics()) { - message.beforeMarshall(wireFormat); StoreTopicTask result = new StoreTopicTask(this, context, message, subscriptionCount.get()); result.aquireLocks(); addTopicTask(this, result); http://git-wip-us.apache.org/repos/asf/activemq/blob/b9b98a45/activemq-leveldb-store/src/main/scala/org/apache/activemq/leveldb/LevelDBStore.scala ---------------------------------------------------------------------- diff --git a/activemq-leveldb-store/src/main/scala/org/apache/activemq/leveldb/LevelDBStore.scala b/activemq-leveldb-store/src/main/scala/org/apache/activemq/leveldb/LevelDBStore.scala index 5865f35..f80e722 100644 --- a/activemq-leveldb-store/src/main/scala/org/apache/activemq/leveldb/LevelDBStore.scala +++ b/activemq-leveldb-store/src/main/scala/org/apache/activemq/leveldb/LevelDBStore.scala @@ -754,7 +754,6 @@ class LevelDBStore extends LockableServiceSupport with BrokerServiceAware with P def doAdd(uow: DelayableUOW, context: ConnectionContext, message: Message, delay:Boolean): CountDownFuture[AnyRef] = { check_running - message.beforeMarshall(wireFormat); message.incrementReferenceCount() uow.addCompleteListener({ message.decrementReferenceCount()