Author: gsim
Date: Wed Jan  7 12:50:35 2009
New Revision: 732482

URL: http://svn.apache.org/viewvc?rev=732482&view=rev
Log:
Ensure that if no store is loaded we don't flow to disk, but revert to 
rejecting messages.


Modified:
    qpid/trunk/qpid/cpp/src/qpid/broker/MessageStoreModule.cpp
    qpid/trunk/qpid/cpp/src/qpid/broker/MessageStoreModule.h
    qpid/trunk/qpid/cpp/src/qpid/broker/NullMessageStore.cpp
    qpid/trunk/qpid/cpp/src/tests/QueuePolicyTest.cpp

Modified: qpid/trunk/qpid/cpp/src/qpid/broker/MessageStoreModule.cpp
URL: 
http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/broker/MessageStoreModule.cpp?rev=732482&r1=732481&r2=732482&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/broker/MessageStoreModule.cpp (original)
+++ qpid/trunk/qpid/cpp/src/qpid/broker/MessageStoreModule.cpp Wed Jan  7 
12:50:35 2009
@@ -20,6 +20,7 @@
  */
 
 #include "MessageStoreModule.h"
+#include "NullMessageStore.h"
 #include <iostream>
 
 // This transfer protects against the unloading of the store lib prior to the 
handling of the exception
@@ -165,4 +166,9 @@
     TRANSFER_EXCEPTION(store->collectPreparedXids(xids));
 }
 
+bool MessageStoreModule::isNull() const
+{
+    return NullMessageStore::isNullStore(store);
+}
+
 }} // namespace qpid::broker

Modified: qpid/trunk/qpid/cpp/src/qpid/broker/MessageStoreModule.h
URL: 
http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/broker/MessageStoreModule.h?rev=732482&r1=732481&r2=732482&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/broker/MessageStoreModule.h (original)
+++ qpid/trunk/qpid/cpp/src/qpid/broker/MessageStoreModule.h Wed Jan  7 
12:50:35 2009
@@ -73,7 +73,8 @@
                  const PersistableQueue& queue);
     uint32_t outstandingQueueAIO(const PersistableQueue& queue);
     void flush(const qpid::broker::PersistableQueue& queue);
-        
+    bool isNull() const;
+
     ~MessageStoreModule();
 };
 

Modified: qpid/trunk/qpid/cpp/src/qpid/broker/NullMessageStore.cpp
URL: 
http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/broker/NullMessageStore.cpp?rev=732482&r1=732481&r2=732482&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/broker/NullMessageStore.cpp (original)
+++ qpid/trunk/qpid/cpp/src/qpid/broker/NullMessageStore.cpp Wed Jan  7 
12:50:35 2009
@@ -20,8 +20,10 @@
  */
 
 #include "NullMessageStore.h"
+#include "MessageStoreModule.h"
 #include "RecoveryManager.h"
 #include "qpid/log/Statement.h"
+#include "qpid/framing/reply_exceptions.h"
 
 #include <iostream>
 
@@ -90,7 +92,10 @@
 
 void NullMessageStore::loadContent(const qpid::broker::PersistableQueue&,
                                    const intrusive_ptr<const 
PersistableMessage>&,
-                                   string&, uint64_t, uint32_t) {}
+                                   string&, uint64_t, uint32_t) 
+{
+    throw qpid::framing::InternalErrorException("Can't load content; 
persistence not enabled");
+}
 
 void NullMessageStore::enqueue(TransactionContext*,
                                const intrusive_ptr<PersistableMessage>& msg,
@@ -149,8 +154,13 @@
 
 bool NullMessageStore::isNullStore(const MessageStore* store)
 {
-    const NullMessageStore* test = dynamic_cast<const 
NullMessageStore*>(store);
-    return test && test->isNull();
+    const MessageStoreModule* wrapper = dynamic_cast<const 
MessageStoreModule*>(store);
+    if (wrapper) {
+        return wrapper->isNull();
+    } else {
+        const NullMessageStore* test = dynamic_cast<const 
NullMessageStore*>(store);
+        return test && test->isNull();
+    }
 }
 
 }}  // namespace qpid::broker

Modified: qpid/trunk/qpid/cpp/src/tests/QueuePolicyTest.cpp
URL: 
http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/tests/QueuePolicyTest.cpp?rev=732482&r1=732481&r2=732482&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/tests/QueuePolicyTest.cpp (original)
+++ qpid/trunk/qpid/cpp/src/tests/QueuePolicyTest.cpp Wed Jan  7 12:50:35 2009
@@ -22,6 +22,7 @@
 #include "test_tools.h"
 
 #include "qpid/broker/QueuePolicy.h"
+#include "qpid/client/QueueOptions.h"
 #include "qpid/sys/Time.h"
 #include "qpid/framing/reply_exceptions.h"
 #include "MessageUtils.h"
@@ -242,5 +243,32 @@
     other.messageTransfer(arg::content=client::Message("Message_6", q));
 }
 
+QPID_AUTO_TEST_CASE(testFlowToDiskWithNoStore) 
+{
+    //Ensure that with no store loaded, we don't flow to disk but
+    //fallback to rejecting messages
+    QueueOptions args;
+    args.setSizePolicy(FLOW_TO_DISK, 0, 5);
+
+    ProxySessionFixture f;
+    std::string q("my-queue");
+    f.session.queueDeclare(arg::queue=q, arg::exclusive=true, 
arg::autoDelete=true, arg::arguments=args);
+    LocalQueue incoming;
+    SubscriptionSettings settings(FlowControl::unlimited());
+    settings.autoAck = 0; // no auto ack.
+    Subscription sub = f.subs.subscribe(incoming, q, settings); 
+    for (int i = 0; i < 5; i++) {
+        
f.session.messageTransfer(arg::content=client::Message((boost::format("%1%_%2%")
 % "Message" % (i+1)).str(), q));
+    }
+    for (int i = 0; i < 5; i++) {
+        BOOST_CHECK_EQUAL(incoming.pop().getData(), (boost::format("%1%_%2%") 
% "Message" % (i+1)).str());
+    }
+    try {
+        ScopedSuppressLogging sl; // Suppress messages for expected errors.
+        f.session.messageTransfer(arg::content=client::Message("Message_6", 
q));
+        BOOST_FAIL("expecting ResourceLimitExceededException.");
+    } catch (const ResourceLimitExceededException&) {}    
+}
+
 
 QPID_AUTO_TEST_SUITE_END()


Reply via email to