Author: robbie
Date: Fri Dec 10 15:00:27 2010
New Revision: 1044387

URL: http://svn.apache.org/viewvc?rev=1044387&view=rev
Log:
QPID-2973: create the StoreContext upfront for one-time use at later points, 
instead of creating them as required. Remove discarded message tag from the 
unacked map instead of allowing it to remain until the map is later cleared

Modified:
    
qpid/branches/0.5.x-dev/qpid/java/broker/src/main/java/org/apache/qpid/server/AMQChannel.java
    
qpid/branches/0.5.x-dev/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/AMQQueueFactory.java

Modified: 
qpid/branches/0.5.x-dev/qpid/java/broker/src/main/java/org/apache/qpid/server/AMQChannel.java
URL: 
http://svn.apache.org/viewvc/qpid/branches/0.5.x-dev/qpid/java/broker/src/main/java/org/apache/qpid/server/AMQChannel.java?rev=1044387&r1=1044386&r2=1044387&view=diff
==============================================================================
--- 
qpid/branches/0.5.x-dev/qpid/java/broker/src/main/java/org/apache/qpid/server/AMQChannel.java
 (original)
+++ 
qpid/branches/0.5.x-dev/qpid/java/broker/src/main/java/org/apache/qpid/server/AMQChannel.java
 Fri Dec 10 15:00:27 2010
@@ -1112,11 +1112,14 @@ public class AMQChannel
 
             AMQQueue queue = rejectedQueueEntry.getQueue();
             Exchange altExchange = queue.getAlternateExchange();
-            
+
+            StoreContext dlqStoreContext = new StoreContext("Session: " + 
_session.getClientIdentifier() + "; channel: " + _channelId + "; DL 
deliveryTag: " + deliveryTag);
+
             if (altExchange == null)
             {
                 _log.warn("No alternate exchange configured for queue, must 
discard the message as unable to DLQ: delivery tag: " + deliveryTag);
-                rejectedQueueEntry.discard(new StoreContext());
+                unackedMap.remove(deliveryTag);
+                rejectedQueueEntry.discard(dlqStoreContext);
                 return;
             }
 
@@ -1127,17 +1130,17 @@ public class AMQChannel
             if (destinationQueues == null || destinationQueues.isEmpty())
             {
                 _log.warn("Routing process provided no queues to enqueue the 
message on, must discard message as unable to DLQ: delivery tag: " + 
deliveryTag);
-                rejectedQueueEntry.discard(new StoreContext());
+                unackedMap.remove(deliveryTag);
+                rejectedQueueEntry.discard(dlqStoreContext);
                 return;
             }
-            
+
             //increment the message reference count to include the new queue(s)
             msg.incrementReference(destinationQueues.size());
-            
-            //create a new storeContext to use with a new TransactionContext 
for the DLQ process
-            StoreContext dlqStoreContext = new StoreContext("Session: " + 
_session.getClientIdentifier() + "; channel: " + _channelId + "; DLQ 
deliveryTag: " + deliveryTag);
+
+            //create a new storeContext to use with the TransactionContext for 
the DLQ process
             DLQTransactionalContext dlqTxnContext = new 
DLQTransactionalContext(this, dlqStoreContext);
-            
+
             //enqueue the message on the new queues in the store if its 
persistent
             if (msg.isPersistent())
             {
@@ -1157,9 +1160,8 @@ public class AMQChannel
             {
                 dlqTxnContext.deliver(destinationQueues.get(i), msg);
             }
-            
+
             dlqTxnContext.commit();
-            
         }
     }
 }

Modified: 
qpid/branches/0.5.x-dev/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/AMQQueueFactory.java
URL: 
http://svn.apache.org/viewvc/qpid/branches/0.5.x-dev/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/AMQQueueFactory.java?rev=1044387&r1=1044386&r2=1044387&view=diff
==============================================================================
--- 
qpid/branches/0.5.x-dev/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/AMQQueueFactory.java
 (original)
+++ 
qpid/branches/0.5.x-dev/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/AMQQueueFactory.java
 Fri Dec 10 15:00:27 2010
@@ -242,6 +242,8 @@ public class AMQQueueFactory
                 //ensure the queue is bound to the exchange
                 if(!dlExchange.isBound(DLQ_ROUTING_KEY, dlQueue))
                 {
+                    //actual routing key used does not matter due to use of 
fanout exchange, 
+                    //but we will make the key 'dlq' as it can be logged at 
creation.
                     dlQueue.bind(dlExchange, DLQ_ROUTING_KEY, null);
                 }
                 



---------------------------------------------------------------------
Apache Qpid - AMQP Messaging Implementation
Project:      http://qpid.apache.org
Use/Interact: mailto:commits-subscr...@qpid.apache.org

Reply via email to