Author: shuston Date: Thu Nov 4 22:52:10 2010 New Revision: 1031325 URL: http://svn.apache.org/viewvc?rev=1031325&view=rev Log: Revised Messages to keep track of queues each message is on to enable finding all messages on a deleted queue. Removed this type of tracking from MSSqlClfsProvider.cpp.
Modified: qpid/trunk/qpid/cpp/src/qpid/store/ms-clfs/MSSqlClfsProvider.cpp qpid/trunk/qpid/cpp/src/qpid/store/ms-clfs/Messages.cpp qpid/trunk/qpid/cpp/src/qpid/store/ms-clfs/Messages.h Modified: qpid/trunk/qpid/cpp/src/qpid/store/ms-clfs/MSSqlClfsProvider.cpp URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/store/ms-clfs/MSSqlClfsProvider.cpp?rev=1031325&r1=1031324&r2=1031325&view=diff ============================================================================== --- qpid/trunk/qpid/cpp/src/qpid/store/ms-clfs/MSSqlClfsProvider.cpp (original) +++ qpid/trunk/qpid/cpp/src/qpid/store/ms-clfs/MSSqlClfsProvider.cpp Thu Nov 4 22:52:10 2010 @@ -21,6 +21,7 @@ #include <list> #include <map> +#include <set> #include <stdlib.h> #include <string> #include <windows.h> @@ -68,21 +69,6 @@ const std::string TblConfig("tblConfig") const std::string TblExchange("tblExchange"); const std::string TblQueue("tblQueue"); -/* - * Maintain a map of id -> QueueContents. RWlock protecting the map allows - * concurrent reads so multiple threads can get access to the needed queue; - * queue lock protects the QueueContents themselves. - */ -struct QueueContents { - typedef boost::shared_ptr<QueueContents> shared_ptr; - qpid::sys::Mutex lock; - std::list<uint64_t> messages; -}; - -typedef std::map<uint64_t, QueueContents::shared_ptr> QueuesMap; -qpid::sys::RWlock queuesLock; -QueuesMap queues; - } namespace qpid { @@ -494,12 +480,6 @@ MSSqlClfsProvider::create(PersistableQue db->beginTransaction(); rsQueues.open(db, TblQueue); rsQueues.add(queue); - { - // Db stuff ok so far; add an empty QueueContents for the queue. - QueueContents::shared_ptr entry(new QueueContents); - qpid::sys::ScopedWlock<qpid::sys::RWlock> l(queuesLock); - queues[queue.getPersistenceId()] = entry; - } db->commitTransaction(); } catch(_com_error &e) { @@ -539,44 +519,13 @@ MSSqlClfsProvider::destroy(PersistableQu } /* - * Now that the SQL stuff has recorded the queue deletion, reflect - * all the dequeued messages in memory. Don't worry about any errors - * that occur while reflecting these in the log because: - * - If we have to recover from this point (or anywhere from here - * until all messages are dequeued) there's no valid queue ID - * from the Enqueue record, so recovery will throw it out anyway. - * - If there is a failure before the SQL changes commit, the - * existing Enqueue records will replace the message on the - * queue during recovery. - * so, the best we could do by logging these dequeue operations is - * record something that will need to be ignored during recovery. - * - * Obtain a write lock to the queue map. Doing so gets this thread - * exclusive access to the queue map. This means no other thread can - * come while we're holding it and access, even for read, the list. - * However, there may already be other previously obtained references - * to the queue's message list outstanding, so also get the queue's - * list lock to serialize with any other threads. We should be able - * to count on the broker not making the destroy() call while other - * uses of the queue are outstanding, but play it safe. + * Now that the SQL stuff has recorded the queue deletion, expunge + * all record of the queue from the messages set. Any errors logging + * these removals are swallowed because during a recovery the queue + * Id won't be present (the SQL stuff already committed) so any references + * to it in message operations will be removed. */ - std::list<uint64_t> affectedMessages; - uint64_t qId = queue.getPersistenceId(); - { - ::qpid::sys::RWlock::ScopedWlock l(queuesLock); - QueueContents::shared_ptr q = queues[qId]; - { - ::qpid::sys::Mutex::ScopedLock ql(q->lock); - affectedMessages = q->messages; - } - queues.erase(queues.find(qId)); - } - // Now tell each of the messages they are less one queue commitment. - Transaction::shared_ptr nonTransactional; - BOOST_FOREACH(uint64_t msgId, affectedMessages) { - QPID_LOG(debug, "Removing message " << msgId); - messages.dequeue(msgId, qId, nonTransactional); - } + messages.expunge(queue.getPersistenceId()); } /** @@ -856,25 +805,12 @@ MSSqlClfsProvider::enqueue(qpid::broker: if (tctx) t = tctx->getTransaction(); } - uint64_t qId = queue.getPersistenceId(); uint64_t msgId = msg->getPersistenceId(); - QueueContents::shared_ptr q; - { - qpid::sys::ScopedRlock<qpid::sys::RWlock> l(queuesLock); - QueuesMap::iterator i = queues.find(qId); - if (i == queues.end()) - THROW_STORE_EXCEPTION("Queue does not exist"); - q = i->second; - } if (msgId == 0) { messages.add(msg); msgId = msg->getPersistenceId(); } - messages.enqueue(msgId, qId, t); - { - qpid::sys::ScopedLock<qpid::sys::Mutex> ql(q->lock); - q->messages.push_back(msgId); - } + messages.enqueue(msgId, queue.getPersistenceId(), t); msg->enqueueComplete(); } @@ -902,21 +838,7 @@ MSSqlClfsProvider::dequeue(qpid::broker: if (tctx) t = tctx->getTransaction(); } - uint64_t qId = queue.getPersistenceId(); - uint64_t msgId = msg->getPersistenceId(); - QueueContents::shared_ptr q; - { - qpid::sys::ScopedRlock<qpid::sys::RWlock> l(queuesLock); - QueuesMap::const_iterator i = queues.find(qId); - if (i == queues.end()) - THROW_STORE_EXCEPTION("Queue does not exist"); - q = i->second; - } - messages.dequeue(msgId, qId, t); - { - qpid::sys::ScopedLock<qpid::sys::Mutex> ql(q->lock); - q->messages.remove(msgId); - } + messages.dequeue(msg->getPersistenceId(), queue.getPersistenceId(), t); msg->dequeueComplete(); } @@ -1063,8 +985,6 @@ MSSqlClfsProvider::recoverQueues(qpid::b recoverer.recoverQueue(blob); queue->setPersistenceId(id); queueMap[id] = queue; - QueueContents::shared_ptr entry(new QueueContents); - queues[id] = entry; p->MoveNext(); } } @@ -1085,9 +1005,28 @@ MSSqlClfsProvider::recoverMessages(qpid: MessageMap& messageMap, MessageQueueMap& messageQueueMap) { + // Read the list of valid queue Ids to ensure that no broken msg->queue + // refs get restored. + DatabaseConnection *db = initConnection(); + BlobRecordset rsQueues; + rsQueues.open(db, TblQueue); + _RecordsetPtr p = (_RecordsetPtr)rsQueues; + std::set<uint64_t> validQueues; + if (!(p->BOF && p->EndOfFile)) { + p->MoveFirst(); + while (!p->EndOfFile) { + uint64_t id = p->Fields->Item["persistenceId"]->Value; + validQueues.insert(id); + p->MoveNext(); + } + } std::map<uint64_t, Transaction::shared_ptr> transMap; transactions->recover(transMap); - messages.recover(recoverer, messageMap, messageQueueMap, transMap); + messages.recover(recoverer, + validQueues, + transMap, + messageMap, + messageQueueMap); } void Modified: qpid/trunk/qpid/cpp/src/qpid/store/ms-clfs/Messages.cpp URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/store/ms-clfs/Messages.cpp?rev=1031325&r1=1031324&r2=1031325&view=diff ============================================================================== --- qpid/trunk/qpid/cpp/src/qpid/store/ms-clfs/Messages.cpp (original) +++ qpid/trunk/qpid/cpp/src/qpid/store/ms-clfs/Messages.cpp Thu Nov 4 22:52:10 2010 @@ -24,6 +24,7 @@ #include "Messages.h" #include "Lsn.h" #include "qpid/store/StoreException.h" +#include <boost/foreach.hpp> namespace qpid { namespace store { @@ -67,37 +68,25 @@ Messages::enqueue(uint64_t msgId, uint64 THROW_STORE_EXCEPTION("Message does not exist"); p = i->second; } - // If transacted, it still needs to be counted as enqueued to ensure it - // is not deleted. Remember the transacted operation so it can be properly - // resolved later. - ::InterlockedIncrement(&p->enqueuedCount); - uint64_t transactionId = 0; - if (t.get() != 0) - transactionId = t->getId(); - if (transactionId != 0) { - qpid::sys::ScopedLock<qpid::sys::Mutex> l(p->transOpsLock); - p->transOps[t].push_back(MessageInfo::TRANSACTION_ENQUEUE); - t->enroll(msgId); - } - try { - log.recordEnqueue(msgId, queueId, transactionId); - } - catch (...) { - // Undo the record-keeping if the log wasn't written correctly. - ::InterlockedDecrement(&p->enqueuedCount); - if (transactionId != 0) { - t->unenroll(msgId); - qpid::sys::ScopedLock<qpid::sys::Mutex> l(p->transOpsLock); - std::vector<MessageInfo::TransType> &oplist = p->transOps[t]; - std::vector<MessageInfo::TransType>::iterator i; - for (i = oplist.begin(); i < oplist.end(); ++i) { - if (*i == MessageInfo::TRANSACTION_ENQUEUE) { - oplist.erase(i); - break; - } - } + MessageInfo::Location loc(queueId, t, MessageInfo::TRANSACTION_ENQUEUE); + { + qpid::sys::ScopedLock<qpid::sys::Mutex> l(p->whereLock); + p->where.push_back(loc); + uint64_t transactionId = 0; + if (t.get() != 0) { + transactionId = t->getId(); + t->enroll(msgId); + } + try { + log.recordEnqueue(msgId, queueId, transactionId); + } + catch (...) { + // Undo the record-keeping if the log wasn't written correctly. + if (transactionId != 0) + t->unenroll(msgId); + p->where.pop_back(); + throw; } - throw; } } @@ -112,38 +101,46 @@ Messages::dequeue(uint64_t msgId, uint64 THROW_STORE_EXCEPTION("Message does not exist"); p = i->second; } - // Remember the transacted operation so it can be properly resolved later. - uint64_t transactionId = 0; - if (t.get() != 0) - transactionId = t->getId(); - if (transactionId != 0) { - qpid::sys::ScopedLock<qpid::sys::Mutex> l(p->transOpsLock); - p->transOps[t].push_back(MessageInfo::TRANSACTION_DEQUEUE); - t->enroll(msgId); - } - try { - log.recordDequeue(msgId, queueId, transactionId); - } - catch(...) { + { + // Locate the 'where' entry for the specified queue. Once this operation + // is recorded in the log, update the 'where' entry to reflect it. + // Note that an existing entry in 'where' that refers to a transaction + // is not eligible for this operation. + qpid::sys::ScopedLock<qpid::sys::Mutex> l(p->whereLock); + std::list<MessageInfo::Location>::iterator i; + for (i = p->where.begin(); i != p->where.end(); ++i) { + if (i->queueId == queueId && i->transaction.get() == 0) + break; + } + if (i == p->where.end()) + THROW_STORE_EXCEPTION("Message not on queue"); + uint64_t transactionId = 0; + if (t.get() != 0) { + transactionId = t->getId(); + t->enroll(msgId); + } + try { + log.recordDequeue(msgId, queueId, transactionId); + } + catch (...) { + // Undo the record-keeping if the log wasn't written correctly. + if (transactionId != 0) + t->unenroll(msgId); + throw; + } + // Ok, logged successfully. If this is a transactional op, note + // the transaction. If non-transactional, remove the 'where' entry. if (transactionId != 0) { - t->unenroll(msgId); - qpid::sys::ScopedLock<qpid::sys::Mutex> l(p->transOpsLock); - std::vector<MessageInfo::TransType> &oplist = p->transOps[t]; - std::vector<MessageInfo::TransType>::iterator i; - for (i = oplist.begin(); i < oplist.end(); ++i) { - if (*i == MessageInfo::TRANSACTION_DEQUEUE) { - oplist.erase(i); - break; - } - } + i->transaction = t; + i->disposition = MessageInfo::TRANSACTION_DEQUEUE; + } + else { + p->where.erase(i); + // If the message doesn't exist on any other queues, remove it. + if (p->where.empty()) + remove(msgId); } - throw; } - - // If transacted, leave the reference until the transaction commits. - if (transactionId == 0) - if (::InterlockedDecrement(&p->enqueuedCount) == 0) - remove(msgId); } // Commit a previous provisional enqueue or dequeue of a particular message @@ -161,22 +158,23 @@ Messages::commit(uint64_t msgId, Transac p = i->second; } { - qpid::sys::ScopedLock<qpid::sys::Mutex> l(p->transOpsLock); - std::vector<MessageInfo::TransType> &oplist = p->transOps[t]; - std::vector<MessageInfo::TransType>::iterator i; - for (i = oplist.begin(); i < oplist.end(); ++i) { - // Transactional dequeues left the ref count alone until commit - // while transaction enqueues already incremented it. - if (*i == MessageInfo::TRANSACTION_DEQUEUE) - ::InterlockedDecrement(&p->enqueuedCount); + qpid::sys::ScopedLock<qpid::sys::Mutex> l(p->whereLock); + std::list<MessageInfo::Location>::iterator i; + for (i = p->where.begin(); i != p->where.end(); ++i) { + if (i->transaction != t) + continue; + // Transactional dequeues can now remove the item from the + // where list; enqueues just clear the transaction reference. + if (i->disposition == MessageInfo::TRANSACTION_DEQUEUE) + i = p->where.erase(i); + else + i->transaction.reset(); } - // Remember, last deref of Transaction::shared_ptr deletes Transaction. - p->transOps.erase(t); } // If committing results in this message having no further enqueue // references, delete it. If the delete fails, swallow the exception // and let recovery take care of removing it later. - if (::InterlockedCompareExchange(&p->enqueuedCount, 0, 0) == 0) { + if (p->where.empty()) { try { remove(msgId); } @@ -199,22 +197,28 @@ Messages::abort(uint64_t msgId, Transact p = i->second; } { - qpid::sys::ScopedLock<qpid::sys::Mutex> l(p->transOpsLock); - std::vector<MessageInfo::TransType> &oplist = p->transOps[t]; - std::vector<MessageInfo::TransType>::iterator i; - for (i = oplist.begin(); i < oplist.end(); ++i) { - // Transactional enqueues incremented the ref count when seen; - // while transaction dequeues left it alone. - if (*i == MessageInfo::TRANSACTION_ENQUEUE) - ::InterlockedDecrement(&p->enqueuedCount); + qpid::sys::ScopedLock<qpid::sys::Mutex> l(p->whereLock); + std::list<MessageInfo::Location>::iterator i = p->where.begin(); + while (i != p->where.end()) { + if (i->transaction != t) { + ++i; + continue; + } + // Aborted transactional dequeues result in the message remaining + // enqueued like before the operation; enqueues clear the + // message from the where list - like the enqueue never happened. + if (i->disposition == MessageInfo::TRANSACTION_ENQUEUE) + i = p->where.erase(i); + else { + i->transaction.reset(); + ++i; + } } - // Remember, last deref of Transaction::shared_ptr deletes Transaction. - p->transOps.erase(t); } - // If committing results in this message having no further enqueue + // If aborting results in this message having no further enqueue // references, delete it. If the delete fails, swallow the exception // and let recovery take care of removing it later. - if (::InterlockedCompareExchange(&p->enqueuedCount, 0, 0) == 0) { + if (p->where.empty()) { try { remove(msgId); } @@ -237,9 +241,10 @@ Messages::loadContent(uint64_t msgId, // the log. void Messages::recover(qpid::broker::RecoveryManager& recoverer, + const std::set<uint64_t> &validQueues, + const std::map<uint64_t, Transaction::shared_ptr>& transMap, qpid::store::MessageMap& messageMap, - qpid::store::MessageQueueMap& messageQueueMap, - const std::map<uint64_t, Transaction::shared_ptr>& transMap) + qpid::store::MessageQueueMap& messageQueueMap) { std::map<uint64_t, std::vector<MessageLog::RecoveredMsgOp> > messageOps; log.recover(recoverer, messageMap, messageOps); @@ -264,9 +269,16 @@ Messages::recover(qpid::broker::Recovery std::vector<MessageLog::RecoveredMsgOp>::const_iterator op; for (op = ops.begin(); op != ops.end(); ++op) { QueueEntry entry(op->queueId); + MessageInfo::Location loc(op->queueId); std::string dir = op->op == MessageLog::RECOVERED_ENQUEUE ? "enqueue" : "dequeue"; + if (validQueues.find(op->queueId) == validQueues.end()) { + QPID_LOG(info, + "Message " << msgId << dir << " on non-existant queue " + << op->queueId << "; dropped"); + continue; + } if (op->txnId != 0) { // Be sure to enroll this message in the transaction even if // it has committed or aborted. This ensures that the @@ -276,22 +288,18 @@ Messages::recover(qpid::broker::Recovery // it couldn't be recovered again. // // Recall what is being reconstructed; 2 things: - // 1. This class's 'messages' list which only keeps track - // of how many queues reference each message (though NOT - // which queues) and the transactions each message is - // enrolled in. For this, aborted transactions cause the - // result of the operation to be ignored, but the - // message does need to be enrolled in the transaction - // to properly maintain the transaction references until - // the message is deleted. - // 2. The StorageProvider's MessageQueueMap, which DOES - // have an entry for each queue each message is on and + // 1. This class's 'messages' list which keeps track + // of the queues each message is on and the transactions + // each message is enrolled in. For this, aborted + // transactions cause the result of the operation to be + // ignored, but the message does need to be enrolled in + // the transaction to properly maintain the transaction + // references until the message is deleted. + // 2. The StorageProvider's MessageQueueMap, which also + // has an entry for each queue each message is on and // its TPL status and associated xid. const Transaction::shared_ptr &t = transMap.find(op->txnId)->second; - // Adds t to map, ensuring a reference to Transaction, even if - // no ops are added to the TransType vector. - std::vector<MessageInfo::TransType>& tOps = m->transOps[t]; // Prepared transactions cause the operation to be // provisionally acted on, and the message to be enrolled in // the transaction for when it commits/aborts. This is @@ -304,13 +312,14 @@ Messages::recover(qpid::broker::Recovery THROW_STORE_EXCEPTION("Invalid transaction state"); t->enroll(msgId); entry.xid = tpct->getXid(); + loc.transaction = t; if (op->op == MessageLog::RECOVERED_ENQUEUE) { - tOps.push_back(MessageInfo::TRANSACTION_ENQUEUE); entry.tplStatus = QueueEntry::ADDING; + loc.disposition = MessageInfo::TRANSACTION_ENQUEUE; } else { - tOps.push_back(MessageInfo::TRANSACTION_DEQUEUE); entry.tplStatus = QueueEntry::REMOVING; + loc.disposition = MessageInfo::TRANSACTION_DEQUEUE; } } else if (t->getState() != Transaction::TRANS_COMMITTED) { @@ -329,42 +338,107 @@ Messages::recover(qpid::broker::Recovery // it if the current op is non-transactional; if it's a prepared // transaction then replace the existing entry with the current // one that notes the message is enqueued but being removed under - // a prepared transaciton. + // a prepared transaction. QPID_LOG(debug, dir + " at queue " << entry.queueId); if (op->op == MessageLog::RECOVERED_ENQUEUE) { entries.push_back(entry); + m->where.push_back(loc); } else { std::vector<QueueEntry>::iterator i = entries.begin(); while (i != entries.end()) { if (i->queueId == entry.queueId) { - *i = entry; + if (entry.tplStatus != QueueEntry::NONE) + *i = entry; + else + entries.erase(i); break; } ++i; } + std::list<MessageInfo::Location>::iterator w = m->where.begin(); + while (w != m->where.end()) { + if (w->queueId == loc.queueId) { + if (loc.transaction.get() != 0) + *w = loc; + else + m->where.erase(w); + } + } } } - // Now that all the queue entries have been set correctly, the - // enqueuedCount that MessageInfo keeps track of is simply the - // number of queue map entries. If there are none, add this - // message to the homeless list to be deleted from the log after - // the recovery is done. - if ((m->enqueuedCount = entries.size()) == 0) { + // Now that all the queue entries have been set correctly, see if + // there are any entries; they may have all been removed during + // recovery. If there are none, add this message to the homeless + // list to be deleted from the log after the recovery is done. + if (m->where.size() == 0) { homeless.push_back(msgId); messageMap.erase(msgId); messageQueueMap.erase(msgId); } - std::pair<uint64_t, MessageInfo::shared_ptr> p(msgId, m); - messages.insert(p); + else { + std::pair<uint64_t, MessageInfo::shared_ptr> p(msgId, m); + messages.insert(p); + } } QPID_LOG(debug, "Message log recovery done."); // Done! Ok, go back and delete all the homeless messages. - for (std::vector<uint64_t>::iterator i = homeless.begin(); - i != homeless.end(); - ++i) { - QPID_LOG(debug, "Deleting homeless message " << *i); - remove(*i); + BOOST_FOREACH(uint64_t msg, homeless) { + QPID_LOG(debug, "Deleting homeless message " << msg); + remove(msg); + } +} + +// Expunge is called when a queue is deleted. All references to that +// queue must be expunged from all messages. 'Dequeue' log records are +// written for each queue entry removed, but any errors are swallowed. +// On recovery there's a list of valid queues passed in. The deleted +// queue will not be on that list so if any references to it are +// recovered they'll get weeded out then. +void +Messages::expunge(uint64_t queueId) +{ + std::vector<uint64_t> toBeDeleted; // Messages to be deleted later. + + { + // Lock everybody out since all messages are possibly in play. + // There also may be other threads already working on particular + // messages so individual message mutex still must be acquired. + qpid::sys::ScopedWlock<qpid::sys::RWlock> l(lock); + MessageMap::iterator m; + for (m = messages.begin(); m != messages.end(); ++m) { + MessageInfo::shared_ptr p = m->second; + { + qpid::sys::ScopedLock<qpid::sys::Mutex> ml(p->whereLock); + std::list<MessageInfo::Location>::iterator i = p->where.begin(); + while (i != p->where.end()) { + if (i->queueId != queueId) { + ++i; + continue; + } + // If this entry is involved in a transaction, unenroll it. + // Then remove the entry. + if (i->transaction.get() != 0) + i->transaction->unenroll(m->first); + i = p->where.erase(i); + try { + log.recordDequeue(m->first, queueId, 0); + } + catch(...) { + } + } + if (p->where.size() == 0) + toBeDeleted.push_back(m->first); + } + } + } + // Swallow any exceptions during this; don't care. Recover it later + // if needed. + try { + BOOST_FOREACH(uint64_t msg, toBeDeleted) + remove(msg); + } + catch(...) { } } Modified: qpid/trunk/qpid/cpp/src/qpid/store/ms-clfs/Messages.h URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/store/ms-clfs/Messages.h?rev=1031325&r1=1031324&r2=1031325&view=diff ============================================================================== --- qpid/trunk/qpid/cpp/src/qpid/store/ms-clfs/Messages.h (original) +++ qpid/trunk/qpid/cpp/src/qpid/store/ms-clfs/Messages.h Thu Nov 4 22:52:10 2010 @@ -24,6 +24,7 @@ #include <windows.h> #include <map> +#include <set> #include <vector> #include <boost/intrusive_ptr.hpp> #include <qpid/broker/PersistableMessage.h> @@ -46,9 +47,38 @@ class Messages { // Keep a list of transactional operations this message is // referenced in. When the transaction changes/finalizes these all // need to be acted on. - typedef enum { TRANSACTION_ENQUEUE, TRANSACTION_DEQUEUE } TransType; + typedef enum { TRANSACTION_NONE = 0, + TRANSACTION_ENQUEUE, + TRANSACTION_DEQUEUE } TransType; +#if 0 std::map<Transaction::shared_ptr, std::vector<TransType> > transOps; qpid::sys::Mutex transOpsLock; +#endif + // Think what I need is a list of "where is this message" - queue id, + // transaction ref, what kind of trans op (enq/deq). Then "remove all + // queue refs" can search through all messages looking for queue ids + // and undo them. Write "remove from queue" record to log. Also need to + // add "remove from queue" to recovery. + struct Location { + uint64_t queueId; + Transaction::shared_ptr transaction; + TransType disposition; + + Location(uint64_t q) + : queueId(q), transaction(), disposition(TRANSACTION_NONE) {} + Location(uint64_t q, Transaction::shared_ptr& t, TransType d) + : queueId(q), transaction(t), disposition(d) {} + }; + qpid::sys::Mutex whereLock; + std::list<Location> where; + // The transactions vector just keeps a shared_ptr to each + // Transaction this message was involved in, regardless of the + // disposition or transaction state. Keeping a valid shared_ptr + // prevents the Transaction from being deleted. As long as there + // are any messages that referred to a transaction, that + // transaction's state needs to be known so the message disposition + // can be correctly recovered if needed. + std::vector<Transaction::shared_ptr> transactions; typedef boost::shared_ptr<MessageInfo> shared_ptr; @@ -96,12 +126,17 @@ public: uint64_t offset, uint32_t length); + // Expunge is called when a queue is deleted. All references to that + // queue must be expunged from all messages. + void expunge(uint64_t queueId); + // Recover the current set of messages and where they're queued from // the log. void recover(qpid::broker::RecoveryManager& recoverer, + const std::set<uint64_t> &validQueues, + const std::map<uint64_t, Transaction::shared_ptr>& transMap, qpid::store::MessageMap& messageMap, - qpid::store::MessageQueueMap& messageQueueMap, - const std::map<uint64_t, Transaction::shared_ptr>& transMap); + qpid::store::MessageQueueMap& messageQueueMap); }; }}} // namespace qpid::store::ms_clfs --------------------------------------------------------------------- Apache Qpid - AMQP Messaging Implementation Project: http://qpid.apache.org Use/Interact: mailto:commits-subscr...@qpid.apache.org