Author: cctrieloff
Date: Tue Dec 16 13:41:01 2008
New Revision: 727166

URL: http://svn.apache.org/viewvc?rev=727166&view=rev
Log:
LVQ queue option for no acquire

Modified:
    qpid/trunk/qpid/cpp/src/qpid/broker/Queue.cpp
    qpid/trunk/qpid/cpp/src/qpid/broker/Queue.h
    qpid/trunk/qpid/cpp/src/qpid/client/QueueOptions.cpp
    qpid/trunk/qpid/cpp/src/qpid/client/QueueOptions.h
    qpid/trunk/qpid/cpp/src/tests/QueueTest.cpp

Modified: qpid/trunk/qpid/cpp/src/qpid/broker/Queue.cpp
URL: 
http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/broker/Queue.cpp?rev=727166&r1=727165&r2=727166&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/broker/Queue.cpp (original)
+++ qpid/trunk/qpid/cpp/src/qpid/broker/Queue.cpp Tue Dec 16 13:41:01 2008
@@ -61,6 +61,7 @@
 const std::string qpidTraceIdentity("qpid.trace.id");
 const std::string qpidTraceExclude("qpid.trace.exclude");
 const std::string qpidLastValueQueue("qpid.last_value_queue");
+const std::string 
qpidLastValueQueueNoAcquire("qpid.last_value_queue_no_acquire");
 const std::string qpidPersistLastNode("qpid.persist_last_node");
 const std::string qpidVQMatchProperty("qpid.LVQ_key");
 }
@@ -79,6 +80,7 @@
     exclusive(0),
     noLocal(false),
     lastValueQueue(false),
+    lastValueQueueNoAcquire(false),
     persistLastNode(false),
     inLastNodeFailure(false),
     persistenceId(0),
@@ -213,7 +215,7 @@
             || (lastValueQueue && (i->position == msg.position) && 
                 msg.payload.get() == checkLvqReplace(*i).payload.get()) )  {
 
-            clearLVQIndex(msg);
+            if (!lastValueQueueNoAcquire) clearLVQIndex(msg);
             messages.erase(i);
             QPID_LOG(debug, "Match found, acquire succeeded: " << i->position 
<< " == " << msg.position);
             return true;
@@ -673,6 +675,12 @@
     lastValueQueue= _settings.get(qpidLastValueQueue);
     if (lastValueQueue) QPID_LOG(debug, "Configured queue as Last Value 
Queue");
 
+    lastValueQueueNoAcquire = _settings.get(qpidLastValueQueueNoAcquire);
+    if (lastValueQueueNoAcquire){
+        QPID_LOG(debug, "Configured queue as Last Value Queue No Acquire");
+        lastValueQueue = lastValueQueueNoAcquire;
+    }
+    
     persistLastNode= _settings.get(qpidPersistLastNode);
     if (persistLastNode) QPID_LOG(debug, "Configured queue to Persist data if 
cluster fails to one node");
 

Modified: qpid/trunk/qpid/cpp/src/qpid/broker/Queue.h
URL: 
http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/broker/Queue.h?rev=727166&r1=727165&r2=727166&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/broker/Queue.h (original)
+++ qpid/trunk/qpid/cpp/src/qpid/broker/Queue.h Tue Dec 16 13:41:01 2008
@@ -76,6 +76,7 @@
             OwnershipToken* exclusive;
             bool noLocal;
             bool lastValueQueue;
+            bool lastValueQueueNoAcquire;
             bool persistLastNode;
             bool inLastNodeFailure;
             std::string traceId;

Modified: qpid/trunk/qpid/cpp/src/qpid/client/QueueOptions.cpp
URL: 
http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/client/QueueOptions.cpp?rev=727166&r1=727165&r2=727166&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/client/QueueOptions.cpp (original)
+++ qpid/trunk/qpid/cpp/src/qpid/client/QueueOptions.cpp Tue Dec 16 13:41:01 
2008
@@ -38,6 +38,7 @@
 const std::string QueueOptions::strLastValueQueue("qpid.last_value_queue");
 const std::string QueueOptions::strPersistLastNode("qpid.persist_last_node");
 const std::string QueueOptions::strLVQMatchProperty("qpid.LVQ_key");
+const std::string 
QueueOptions::strLastValueQueueNoAcquire("qpid.last_value_queue_no_acquire");
 
 
 QueueOptions::~QueueOptions()
@@ -79,7 +80,9 @@
 {
     if (op == LVQ){
         setInt(strLastValueQueue, 1); 
-    }else{
+    }else if (op == LVQ_NO_ACQUIRE){
+        setInt(strLastValueQueueNoAcquire, 1); 
+    }else {
         clearOrdering();
     }
 }

Modified: qpid/trunk/qpid/cpp/src/qpid/client/QueueOptions.h
URL: 
http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/client/QueueOptions.h?rev=727166&r1=727165&r2=727166&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/client/QueueOptions.h (original)
+++ qpid/trunk/qpid/cpp/src/qpid/client/QueueOptions.h Tue Dec 16 13:41:01 2008
@@ -27,7 +27,7 @@
 namespace client {
 
 enum QueueSizePolicy {NONE, REJECT, FLOW_TO_DISK, RING, RING_STRICT};
-enum QueueOrderingPolicy {FIFO, LVQ};  
+enum QueueOrderingPolicy {FIFO, LVQ, LVQ_NO_ACQUIRE};  
 
 /**
  * A help class to set options on the Queue. Create a configured args while
@@ -94,6 +94,7 @@
     static const std::string strLastValueQueue;
     static const std::string strPersistLastNode;
     static const std::string strLVQMatchProperty;
+    static const std::string strLastValueQueueNoAcquire;
 };
 
 }

Modified: qpid/trunk/qpid/cpp/src/tests/QueueTest.cpp
URL: 
http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/tests/QueueTest.cpp?rev=727166&r1=727165&r2=727166&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/tests/QueueTest.cpp (original)
+++ qpid/trunk/qpid/cpp/src/tests/QueueTest.cpp Tue Dec 16 13:41:01 2008
@@ -370,6 +370,7 @@
     intrusive_ptr<Message> msg3 = create_message("e", "C");
     intrusive_ptr<Message> msg4 = create_message("e", "D");
     intrusive_ptr<Message> msg5 = create_message("e", "F");
+    intrusive_ptr<Message> msg6 = create_message("e", "G");
 
     //set deliever match for LVQ a,b,c,a
 
@@ -383,6 +384,7 @@
     
msg3->getProperties<MessageProperties>()->getApplicationHeaders().setString(key,"c");
     
msg4->getProperties<MessageProperties>()->getApplicationHeaders().setString(key,"a");
     
msg5->getProperties<MessageProperties>()->getApplicationHeaders().setString(key,"b");
+    
msg6->getProperties<MessageProperties>()->getApplicationHeaders().setString(key,"a");
        
     //enqueue 4 message
     queue->deliver(msg1);
@@ -403,6 +405,12 @@
     
     queue->deliver(msg5);
     BOOST_CHECK_EQUAL(queue->getMessageCount(), 3u);
+
+    // set mode to no acquire and check
+    args.setOrdering(client::LVQ_NO_ACQUIRE);
+    queue->configure(args);
+    queue->deliver(msg6);
+    BOOST_CHECK_EQUAL(queue->getMessageCount(), 3u);
  
 }
 


Reply via email to