Author: robbie Date: Fri Dec 10 15:00:27 2010 New Revision: 1044387 URL: http://svn.apache.org/viewvc?rev=1044387&view=rev Log: QPID-2973: create the StoreContext upfront for one-time use at later points, instead of creating them as required. Remove discarded message tag from the unacked map instead of allowing it to remain until the map is later cleared
Modified: qpid/branches/0.5.x-dev/qpid/java/broker/src/main/java/org/apache/qpid/server/AMQChannel.java qpid/branches/0.5.x-dev/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/AMQQueueFactory.java Modified: qpid/branches/0.5.x-dev/qpid/java/broker/src/main/java/org/apache/qpid/server/AMQChannel.java URL: http://svn.apache.org/viewvc/qpid/branches/0.5.x-dev/qpid/java/broker/src/main/java/org/apache/qpid/server/AMQChannel.java?rev=1044387&r1=1044386&r2=1044387&view=diff ============================================================================== --- qpid/branches/0.5.x-dev/qpid/java/broker/src/main/java/org/apache/qpid/server/AMQChannel.java (original) +++ qpid/branches/0.5.x-dev/qpid/java/broker/src/main/java/org/apache/qpid/server/AMQChannel.java Fri Dec 10 15:00:27 2010 @@ -1112,11 +1112,14 @@ public class AMQChannel AMQQueue queue = rejectedQueueEntry.getQueue(); Exchange altExchange = queue.getAlternateExchange(); - + + StoreContext dlqStoreContext = new StoreContext("Session: " + _session.getClientIdentifier() + "; channel: " + _channelId + "; DL deliveryTag: " + deliveryTag); + if (altExchange == null) { _log.warn("No alternate exchange configured for queue, must discard the message as unable to DLQ: delivery tag: " + deliveryTag); - rejectedQueueEntry.discard(new StoreContext()); + unackedMap.remove(deliveryTag); + rejectedQueueEntry.discard(dlqStoreContext); return; } @@ -1127,17 +1130,17 @@ public class AMQChannel if (destinationQueues == null || destinationQueues.isEmpty()) { _log.warn("Routing process provided no queues to enqueue the message on, must discard message as unable to DLQ: delivery tag: " + deliveryTag); - rejectedQueueEntry.discard(new StoreContext()); + unackedMap.remove(deliveryTag); + rejectedQueueEntry.discard(dlqStoreContext); return; } - + //increment the message reference count to include the new queue(s) msg.incrementReference(destinationQueues.size()); - - //create a new storeContext to use with a new TransactionContext for the DLQ process - StoreContext dlqStoreContext = new StoreContext("Session: " + _session.getClientIdentifier() + "; channel: " + _channelId + "; DLQ deliveryTag: " + deliveryTag); + + //create a new storeContext to use with the TransactionContext for the DLQ process DLQTransactionalContext dlqTxnContext = new DLQTransactionalContext(this, dlqStoreContext); - + //enqueue the message on the new queues in the store if its persistent if (msg.isPersistent()) { @@ -1157,9 +1160,8 @@ public class AMQChannel { dlqTxnContext.deliver(destinationQueues.get(i), msg); } - + dlqTxnContext.commit(); - } } } Modified: qpid/branches/0.5.x-dev/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/AMQQueueFactory.java URL: http://svn.apache.org/viewvc/qpid/branches/0.5.x-dev/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/AMQQueueFactory.java?rev=1044387&r1=1044386&r2=1044387&view=diff ============================================================================== --- qpid/branches/0.5.x-dev/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/AMQQueueFactory.java (original) +++ qpid/branches/0.5.x-dev/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/AMQQueueFactory.java Fri Dec 10 15:00:27 2010 @@ -242,6 +242,8 @@ public class AMQQueueFactory //ensure the queue is bound to the exchange if(!dlExchange.isBound(DLQ_ROUTING_KEY, dlQueue)) { + //actual routing key used does not matter due to use of fanout exchange, + //but we will make the key 'dlq' as it can be logged at creation. dlQueue.bind(dlExchange, DLQ_ROUTING_KEY, null); } --------------------------------------------------------------------- Apache Qpid - AMQP Messaging Implementation Project: http://qpid.apache.org Use/Interact: mailto:commits-subscr...@qpid.apache.org