Author: shuston Date: Fri Oct 29 16:58:59 2010 New Revision: 1028840 URL: http://svn.apache.org/viewvc?rev=1028840&view=rev Log: Since LSNs are used as persistence IDs and ID 0 usually means "not persisted", ensure that no log record that's used gets written at LSN 0. Add stub for loadContent(). Correct transaction references in enqueue/dequeue.
Modified: qpid/trunk/qpid/cpp/src/qpid/store/ms-clfs/Log.cpp qpid/trunk/qpid/cpp/src/qpid/store/ms-clfs/Log.h qpid/trunk/qpid/cpp/src/qpid/store/ms-clfs/MSSqlClfsProvider.cpp qpid/trunk/qpid/cpp/src/qpid/store/ms-clfs/MessageLog.cpp qpid/trunk/qpid/cpp/src/qpid/store/ms-clfs/MessageLog.h qpid/trunk/qpid/cpp/src/qpid/store/ms-clfs/Messages.cpp qpid/trunk/qpid/cpp/src/qpid/store/ms-clfs/Messages.h qpid/trunk/qpid/cpp/src/qpid/store/ms-clfs/TransactionLog.cpp qpid/trunk/qpid/cpp/src/qpid/store/ms-clfs/TransactionLog.h Modified: qpid/trunk/qpid/cpp/src/qpid/store/ms-clfs/Log.cpp URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/store/ms-clfs/Log.cpp?rev=1028840&r1=1028839&r2=1028840&view=diff ============================================================================== --- qpid/trunk/qpid/cpp/src/qpid/store/ms-clfs/Log.cpp (original) +++ qpid/trunk/qpid/cpp/src/qpid/store/ms-clfs/Log.cpp Fri Oct 29 16:58:59 2010 @@ -62,6 +62,9 @@ Log::open(const std::string& path, const ULONG infoSize = sizeof(info); BOOL ok = ::GetLogFileInformation(handle, &info, &infoSize); QPID_WINDOWS_CHECK_NOT(ok, 0); + // If this is the first time this log is opened, give an opportunity to + // initialize its content. + bool needInitialize(false); if (info.TotalContainers == 0) { std::vector<const std::wstring> paths; LPWSTR cPaths[1024]; @@ -82,6 +85,7 @@ Log::open(const std::string& path, const cPaths, NULL); QPID_WINDOWS_CHECK_NOT(ok, 0); + needInitialize = true; } // Need a marshaling area ok = ::CreateLogMarshallingArea(handle, @@ -91,6 +95,8 @@ Log::open(const std::string& path, const 1, // Max read buffers &marshal); QPID_WINDOWS_CHECK_NOT(ok, 0); + if (needInitialize) + initialize(); } uint32_t Modified: qpid/trunk/qpid/cpp/src/qpid/store/ms-clfs/Log.h URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/store/ms-clfs/Log.h?rev=1028840&r1=1028839&r2=1028840&view=diff ============================================================================== --- qpid/trunk/qpid/cpp/src/qpid/store/ms-clfs/Log.h (original) +++ qpid/trunk/qpid/cpp/src/qpid/store/ms-clfs/Log.h Fri Oct 29 16:58:59 2010 @@ -44,6 +44,11 @@ protected: std::string logPath; PVOID marshal; + // Give subclasses a chance to initialize a new log. Called after a new + // log is created, initial set of containers is added, and marshalling + // area is allocated. + virtual void initialize() {} + public: struct TuningParameters { size_t containerSize; Modified: qpid/trunk/qpid/cpp/src/qpid/store/ms-clfs/MSSqlClfsProvider.cpp URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/store/ms-clfs/MSSqlClfsProvider.cpp?rev=1028840&r1=1028839&r2=1028840&view=diff ============================================================================== --- qpid/trunk/qpid/cpp/src/qpid/store/ms-clfs/MSSqlClfsProvider.cpp (original) +++ qpid/trunk/qpid/cpp/src/qpid/store/ms-clfs/MSSqlClfsProvider.cpp Fri Oct 29 16:58:59 2010 @@ -572,10 +572,10 @@ MSSqlClfsProvider::destroy(PersistableQu queues.erase(queues.find(qId)); } // Now tell each of the messages they are less one queue commitment. - // Can I call dequeue()? Or some sub-piece of that? Transaction::shared_ptr nonTransactional; BOOST_FOREACH(uint64_t msgId, affectedMessages) { - messages.dequeue(msgId, qId, nonTransactional); + QPID_LOG(debug, "Removing message " << msgId); + messages.dequeue(msgId, qId, nonTransactional); } } @@ -827,20 +827,9 @@ MSSqlClfsProvider::loadContent(const qpi uint64_t offset, uint32_t length) { -#if 0 - // SQL store keeps all messages in one table, so we don't need the + // Message log keeps all messages in one log, so we don't need the // queue reference. - DatabaseConnection *db = initConnection(); - MessageRecordset rsMessages; - try { - rsMessages.open(db, TblMessage); - rsMessages.loadContent(msg, data, offset, length); - } - catch(_com_error &e) { - std::string errs = db->getErrors(); - throw ADOException("Error loading message content", e, errs); - } -#endif + messages.loadContent(msg->getPersistenceId(), data, offset, length); } /** @@ -858,9 +847,15 @@ MSSqlClfsProvider::enqueue(qpid::broker: const PersistableQueue& queue) { Transaction::shared_ptr t; - TransactionContext *ctx = dynamic_cast<TransactionContext*> (ctxt); - if (ctx != 0) + TransactionContext *ctx = dynamic_cast<TransactionContext*>(ctxt); + if (ctx) t = ctx->getTransaction(); + else { + TPCTransactionContext *tctx; + tctx = dynamic_cast<TPCTransactionContext*>(ctxt); + if (tctx) + t = tctx->getTransaction(); + } uint64_t qId = queue.getPersistenceId(); uint64_t msgId = msg->getPersistenceId(); QueueContents::shared_ptr q; @@ -898,9 +893,15 @@ MSSqlClfsProvider::dequeue(qpid::broker: const PersistableQueue& queue) { Transaction::shared_ptr t; - TransactionContext *ctx = dynamic_cast<TransactionContext*> (ctxt); - if (ctx != 0) + TransactionContext *ctx = dynamic_cast<TransactionContext*>(ctxt); + if (ctx) t = ctx->getTransaction(); + else { + TPCTransactionContext *tctx; + tctx = dynamic_cast<TPCTransactionContext*>(ctxt); + if (tctx) + t = tctx->getTransaction(); + } uint64_t qId = queue.getPersistenceId(); uint64_t msgId = msg->getPersistenceId(); QueueContents::shared_ptr q; Modified: qpid/trunk/qpid/cpp/src/qpid/store/ms-clfs/MessageLog.cpp URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/store/ms-clfs/MessageLog.cpp?rev=1028840&r1=1028839&r2=1028840&view=diff ============================================================================== --- qpid/trunk/qpid/cpp/src/qpid/store/ms-clfs/MessageLog.cpp (original) +++ qpid/trunk/qpid/cpp/src/qpid/store/ms-clfs/MessageLog.cpp Fri Oct 29 16:58:59 2010 @@ -49,12 +49,18 @@ struct MessageStart { MessageEntryType type; // If the complete message encoding doesn't fit, remainder is in // MessageChunk records to follow. + // headerLength is the size of the message's header in content. It is + // part of the totalLength and the segmentLength. + uint32_t headerLength; uint32_t totalLength; uint32_t segmentLength; char content[MaxMessageContentLength]; MessageStart() - : type(MessageStartEntry), totalLength(0), segmentLength(0) {} + : type(MessageStartEntry), + headerLength(0), + totalLength(0), + segmentLength(0) {} }; // Message-Chunk struct MessageChunk { @@ -95,6 +101,16 @@ namespace qpid { namespace store { namespace ms_clfs { +void +MessageLog::initialize() +{ + // Write something to occupy the first record, preventing a real message + // from being lsn/id 0. Delete of a non-existant id is easily tossed + // during recovery if no other messages have caused the tail to be moved + // up past this dummy record by then. + deleteMessage(0, 0); +} + uint32_t MessageLog::marshallingBufferSize() { @@ -114,8 +130,10 @@ MessageLog::add(const boost::intrusive_p // Message-Chunk records to contain the rest. If it does all fit in one // record, though, optimize the encoding by going straight to the // Message-Start record rather than encoding then copying to the record. + // In all case MessageStart entry; uint32_t encodedMessageLength = msg->encodedSize(); + entry.headerLength = msg->encodedHeaderSize(); entry.totalLength = encodedMessageLength; CLFS_LSN location, lastChunkLsn; std::auto_ptr<char> encodeStage; @@ -165,6 +183,16 @@ MessageLog::deleteMessage(uint64_t messa moveTail(idToLsn(newFirstId)); } +// Load part or all of a message's content from previously stored +// log record(s). +void +MessageLog::loadContent(uint64_t messageId, + std::string& data, + uint64_t offset, + uint32_t length) +{ +} + void MessageLog::recordEnqueue (uint64_t messageId, uint64_t queueId, @@ -202,9 +230,11 @@ MessageLog::recover(qpid::broker::Recove std::map<uint64_t, MessageBlocks> reassemblies; std::map<uint64_t, MessageBlocks>::iterator at; - // Note that there may be message refs in the log which are deleted, so - // be sure to only add msgs at message-start record, and ignore those - // that don't have an existing message record. + QPID_LOG(debug, "Recovering message log"); + + // Note that there may be message refs in the log which are deleted, so + // be sure to only add msgs at message-start record, and ignore those + // that don't have an existing message record. // Get the base LSN - that's how to say "start reading at the beginning" CLFS_INFORMATION info; ULONG infoLength = sizeof (info); @@ -253,11 +283,20 @@ MessageLog::recover(qpid::broker::Recove // this content off to the side until the remaining record(s) are // located. if (start->totalLength == start->segmentLength) { // Whole thing - qpid::framing::Buffer buff(start->content, start->totalLength); + // Start by recovering the header then see if the rest of + // the content is desired. + qpid::framing::Buffer buff(start->content, start->headerLength); qpid::broker::RecoverableMessage::shared_ptr m = recoverer.recoverMessage(buff); m->setPersistenceId(msgId); messageMap[msgId] = m; + uint32_t contentLength = + start->totalLength - start->headerLength; + if (m->loadContent(contentLength)) { + qpid::framing::Buffer content(&(start->content[start->headerLength]), + contentLength); + m->decodeContent(content); + } } else { // Save it in a block big enough. @@ -310,7 +349,7 @@ MessageLog::recover(qpid::broker::Recove enqueue = reinterpret_cast<MessageEnqueue *>(recordPointer); msgId = lsnToId(messageLsn); QPID_LOG(debug, "Message " << msgId << " Enqueue on queue " << - enqueue->queueId); + enqueue->queueId << ", txn " << enqueue->transId); if (messageMap.find(msgId) == messageMap.end()) { QPID_LOG(debug, "Message " << msgId << " doesn't exist; discarded"); @@ -357,8 +396,10 @@ MessageLog::recover(qpid::broker::Recove } DWORD status = ::GetLastError(); ::TerminateReadLog(readContext); - if (status == ERROR_HANDLE_EOF) // No more records + if (status == ERROR_HANDLE_EOF) { // No more records + QPID_LOG(debug, "Message log recovered"); return; + } throw QPID_WINDOWS_ERROR(status); } Modified: qpid/trunk/qpid/cpp/src/qpid/store/ms-clfs/MessageLog.h URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/store/ms-clfs/MessageLog.h?rev=1028840&r1=1028839&r2=1028840&view=diff ============================================================================== --- qpid/trunk/qpid/cpp/src/qpid/store/ms-clfs/MessageLog.h (original) +++ qpid/trunk/qpid/cpp/src/qpid/store/ms-clfs/MessageLog.h Fri Oct 29 16:58:59 2010 @@ -41,6 +41,11 @@ namespace ms_clfs { */ class MessageLog : public Log { +protected: + // Message log needs to have a no-op first record written in the log + // to ensure that no real message gets an ID 0. + virtual void initialize(); + public: // Inherited and reimplemented from Log. Figure the minimum marshalling // buffer size needed for the records this class writes. @@ -53,6 +58,13 @@ public: // the earliest valid message in the log, so move the tail up to it. void deleteMessage(uint64_t messageId, uint64_t newFirstId); + // Load part or all of a message's content from previously stored + // log record(s). + void loadContent(uint64_t messageId, + std::string& data, + uint64_t offset, + uint32_t length); + // Enqueue and dequeue operations track messages' transit across // queues; each operation may be associated with a transaction. If // the transactionId is 0 the operation is not associated with a Modified: qpid/trunk/qpid/cpp/src/qpid/store/ms-clfs/Messages.cpp URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/store/ms-clfs/Messages.cpp?rev=1028840&r1=1028839&r2=1028840&view=diff ============================================================================== --- qpid/trunk/qpid/cpp/src/qpid/store/ms-clfs/Messages.cpp (original) +++ qpid/trunk/qpid/cpp/src/qpid/store/ms-clfs/Messages.cpp Fri Oct 29 16:58:59 2010 @@ -222,6 +222,17 @@ Messages::abort(uint64_t msgId, Transact } } +// Load part or all of a message's content from previously stored +// log record(s). +void +Messages::loadContent(uint64_t msgId, + std::string& data, + uint64_t offset, + uint32_t length) +{ + log.loadContent(msgId, data, offset, length); +} + // Recover the current set of messages and where they're queued from // the log. void @@ -247,7 +258,7 @@ Messages::recover(qpid::broker::Recovery for (msg = messageOps.begin(); msg != messageOps.end(); ++msg) { uint64_t msgId = msg->first; const std::vector<MessageLog::RecoveredMsgOp>& ops = msg->second; - QPID_LOG(debug, "Message " << msgId << "; " << ops.size() << " ops"); + QPID_LOG(debug, "Message " << msgId << "; " << ops.size() << " op(s)"); MessageInfo::shared_ptr m(new MessageInfo); std::vector<QueueEntry>& entries = messageQueueMap[msgId]; std::vector<MessageLog::RecoveredMsgOp>::const_iterator op; Modified: qpid/trunk/qpid/cpp/src/qpid/store/ms-clfs/Messages.h URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/store/ms-clfs/Messages.h?rev=1028840&r1=1028839&r2=1028840&view=diff ============================================================================== --- qpid/trunk/qpid/cpp/src/qpid/store/ms-clfs/Messages.h (original) +++ qpid/trunk/qpid/cpp/src/qpid/store/ms-clfs/Messages.h Fri Oct 29 16:58:59 2010 @@ -52,8 +52,7 @@ class Messages { typedef boost::shared_ptr<MessageInfo> shared_ptr; - MessageInfo() - : enqueuedCount(0) { /*latestLsn.Internal = 0;*/ } + MessageInfo() : enqueuedCount(0) {} }; qpid::sys::RWlock lock; @@ -90,6 +89,13 @@ public: // being removed from all queues, it is deleted. void abort(uint64_t msgId, Transaction::shared_ptr& transaction); + // Load part or all of a message's content from previously stored + // log record(s). + void loadContent(uint64_t msgId, + std::string& data, + uint64_t offset, + uint32_t length); + // Recover the current set of messages and where they're queued from // the log. void recover(qpid::broker::RecoveryManager& recoverer, Modified: qpid/trunk/qpid/cpp/src/qpid/store/ms-clfs/TransactionLog.cpp URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/store/ms-clfs/TransactionLog.cpp?rev=1028840&r1=1028839&r2=1028840&view=diff ============================================================================== --- qpid/trunk/qpid/cpp/src/qpid/store/ms-clfs/TransactionLog.cpp (original) +++ qpid/trunk/qpid/cpp/src/qpid/store/ms-clfs/TransactionLog.cpp Fri Oct 29 16:58:59 2010 @@ -99,6 +99,16 @@ namespace qpid { namespace store { namespace ms_clfs { +void +TransactionLog::initialize() +{ + // Write something to occupy the first record, preventing a real + // transaction from being lsn/id 0. Delete of a non-existant id is easily + // tossed during recovery if no other transactions have caused the tail + // to be moved up past this dummy record by then. + deleteTransaction(0); +} + uint32_t TransactionLog::marshallingBufferSize() { @@ -226,7 +236,6 @@ TransactionLog::deleteTransaction(uint64 write(&deleteEntry, sizeof(deleteEntry), &transLsn); if (newFirstId != 0) moveTail(idToLsn(newFirstId)); - } void @@ -252,6 +261,8 @@ TransactionLog::collectPreparedXids(std: void TransactionLog::recover(std::map<uint64_t, Transaction::shared_ptr>& transMap) { + QPID_LOG(debug, "Recovering transaction log"); + // Note that there may be transaction refs in the log which are deleted, // so be sure to only add transactions at Start records, and ignore those // that don't have an existing message record. @@ -378,6 +389,8 @@ TransactionLog::recover(std::map<uint64_ if (status != ERROR_HANDLE_EOF) // No more records throw QPID_WINDOWS_ERROR(status); + QPID_LOG(debug, "Transaction log recovered"); + // At this point we have a list of all the not-deleted transactions that // were in existence when the broker last ran. All transactions of both // Dtx and Tx types that haven't prepared or committed will be aborted. Modified: qpid/trunk/qpid/cpp/src/qpid/store/ms-clfs/TransactionLog.h URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/store/ms-clfs/TransactionLog.h?rev=1028840&r1=1028839&r2=1028840&view=diff ============================================================================== --- qpid/trunk/qpid/cpp/src/qpid/store/ms-clfs/TransactionLog.h (original) +++ qpid/trunk/qpid/cpp/src/qpid/store/ms-clfs/TransactionLog.h Fri Oct 29 16:58:59 2010 @@ -58,6 +58,12 @@ class TransactionLog : public Log, std::map<uint64_t, boost::weak_ptr<Transaction> > validIds; qpid::sys::Mutex idsLock; +protected: + // Transaction log needs to have a no-op first record written in the log + // to ensure that no real transaction gets an ID 0; messages think trans + // id 0 means "no transaction." + virtual void initialize(); + public: // Inherited and reimplemented from Log. Figure the minimum marshalling // buffer size needed for the records this class writes. --------------------------------------------------------------------- Apache Qpid - AMQP Messaging Implementation Project: http://qpid.apache.org Use/Interact: mailto:commits-subscr...@qpid.apache.org