Author: shuston
Date: Fri Oct 29 16:58:59 2010
New Revision: 1028840

URL: http://svn.apache.org/viewvc?rev=1028840&view=rev
Log:
Since LSNs are used as persistence IDs and ID 0 usually means "not persisted", 
ensure that no log record that's used gets written at LSN 0.
Add stub for loadContent().
Correct transaction references in enqueue/dequeue.

Modified:
    qpid/trunk/qpid/cpp/src/qpid/store/ms-clfs/Log.cpp
    qpid/trunk/qpid/cpp/src/qpid/store/ms-clfs/Log.h
    qpid/trunk/qpid/cpp/src/qpid/store/ms-clfs/MSSqlClfsProvider.cpp
    qpid/trunk/qpid/cpp/src/qpid/store/ms-clfs/MessageLog.cpp
    qpid/trunk/qpid/cpp/src/qpid/store/ms-clfs/MessageLog.h
    qpid/trunk/qpid/cpp/src/qpid/store/ms-clfs/Messages.cpp
    qpid/trunk/qpid/cpp/src/qpid/store/ms-clfs/Messages.h
    qpid/trunk/qpid/cpp/src/qpid/store/ms-clfs/TransactionLog.cpp
    qpid/trunk/qpid/cpp/src/qpid/store/ms-clfs/TransactionLog.h

Modified: qpid/trunk/qpid/cpp/src/qpid/store/ms-clfs/Log.cpp
URL: 
http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/store/ms-clfs/Log.cpp?rev=1028840&r1=1028839&r2=1028840&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/store/ms-clfs/Log.cpp (original)
+++ qpid/trunk/qpid/cpp/src/qpid/store/ms-clfs/Log.cpp Fri Oct 29 16:58:59 2010
@@ -62,6 +62,9 @@ Log::open(const std::string& path, const
     ULONG infoSize = sizeof(info);
     BOOL ok = ::GetLogFileInformation(handle, &info, &infoSize);
     QPID_WINDOWS_CHECK_NOT(ok, 0);
+    // If this is the first time this log is opened, give an opportunity to
+    // initialize its content.
+    bool needInitialize(false);
     if (info.TotalContainers == 0) {
         std::vector<const std::wstring> paths;
         LPWSTR cPaths[1024];
@@ -82,6 +85,7 @@ Log::open(const std::string& path, const
                                   cPaths,
                                   NULL);
         QPID_WINDOWS_CHECK_NOT(ok, 0);
+        needInitialize = true;
     }
     // Need a marshaling area
     ok = ::CreateLogMarshallingArea(handle,
@@ -91,6 +95,8 @@ Log::open(const std::string& path, const
                                     1,                   // Max read buffers
                                     &marshal);
     QPID_WINDOWS_CHECK_NOT(ok, 0);
+    if (needInitialize)
+        initialize();
 }
 
 uint32_t

Modified: qpid/trunk/qpid/cpp/src/qpid/store/ms-clfs/Log.h
URL: 
http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/store/ms-clfs/Log.h?rev=1028840&r1=1028839&r2=1028840&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/store/ms-clfs/Log.h (original)
+++ qpid/trunk/qpid/cpp/src/qpid/store/ms-clfs/Log.h Fri Oct 29 16:58:59 2010
@@ -44,6 +44,11 @@ protected:
     std::string logPath;
     PVOID marshal;
 
+    // Give subclasses a chance to initialize a new log. Called after a new
+    // log is created, initial set of containers is added, and marshalling
+    // area is allocated.
+    virtual void initialize() {}
+
 public:
     struct TuningParameters {
         size_t containerSize;

Modified: qpid/trunk/qpid/cpp/src/qpid/store/ms-clfs/MSSqlClfsProvider.cpp
URL: 
http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/store/ms-clfs/MSSqlClfsProvider.cpp?rev=1028840&r1=1028839&r2=1028840&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/store/ms-clfs/MSSqlClfsProvider.cpp (original)
+++ qpid/trunk/qpid/cpp/src/qpid/store/ms-clfs/MSSqlClfsProvider.cpp Fri Oct 29 
16:58:59 2010
@@ -572,10 +572,10 @@ MSSqlClfsProvider::destroy(PersistableQu
         queues.erase(queues.find(qId));
     }
     // Now tell each of the messages they are less one queue commitment.
-    // Can I call dequeue()? Or some sub-piece of that?
     Transaction::shared_ptr nonTransactional;
     BOOST_FOREACH(uint64_t msgId, affectedMessages) {
-      messages.dequeue(msgId, qId, nonTransactional);
+        QPID_LOG(debug, "Removing message " << msgId);
+        messages.dequeue(msgId, qId, nonTransactional);
     }
 }
 
@@ -827,20 +827,9 @@ MSSqlClfsProvider::loadContent(const qpi
                                uint64_t offset,
                                uint32_t length)
 {
-#if 0
-    // SQL store keeps all messages in one table, so we don't need the
+    // Message log keeps all messages in one log, so we don't need the
     // queue reference.
-    DatabaseConnection *db = initConnection();
-    MessageRecordset rsMessages;
-    try {
-        rsMessages.open(db, TblMessage);
-        rsMessages.loadContent(msg, data, offset, length);
-    }
-    catch(_com_error &e) {
-        std::string errs = db->getErrors();
-        throw ADOException("Error loading message content", e, errs);
-    }
-#endif
+    messages.loadContent(msg->getPersistenceId(), data, offset, length);
 }
 
 /**
@@ -858,9 +847,15 @@ MSSqlClfsProvider::enqueue(qpid::broker:
                            const PersistableQueue& queue)
 {
     Transaction::shared_ptr t;
-    TransactionContext *ctx = dynamic_cast<TransactionContext*> (ctxt);
-    if (ctx != 0)
+    TransactionContext *ctx = dynamic_cast<TransactionContext*>(ctxt);
+    if (ctx)
         t = ctx->getTransaction();
+    else {
+        TPCTransactionContext *tctx;
+        tctx = dynamic_cast<TPCTransactionContext*>(ctxt);
+        if (tctx)
+            t = tctx->getTransaction();
+    }
     uint64_t qId = queue.getPersistenceId();
     uint64_t msgId = msg->getPersistenceId();
     QueueContents::shared_ptr q;
@@ -898,9 +893,15 @@ MSSqlClfsProvider::dequeue(qpid::broker:
                            const PersistableQueue& queue)
 {
     Transaction::shared_ptr t;
-    TransactionContext *ctx = dynamic_cast<TransactionContext*> (ctxt);
-    if (ctx != 0)
+    TransactionContext *ctx = dynamic_cast<TransactionContext*>(ctxt);
+    if (ctx)
         t = ctx->getTransaction();
+    else {
+        TPCTransactionContext *tctx;
+        tctx = dynamic_cast<TPCTransactionContext*>(ctxt);
+        if (tctx)
+            t = tctx->getTransaction();
+    }
     uint64_t qId = queue.getPersistenceId();
     uint64_t msgId = msg->getPersistenceId();
     QueueContents::shared_ptr q;

Modified: qpid/trunk/qpid/cpp/src/qpid/store/ms-clfs/MessageLog.cpp
URL: 
http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/store/ms-clfs/MessageLog.cpp?rev=1028840&r1=1028839&r2=1028840&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/store/ms-clfs/MessageLog.cpp (original)
+++ qpid/trunk/qpid/cpp/src/qpid/store/ms-clfs/MessageLog.cpp Fri Oct 29 
16:58:59 2010
@@ -49,12 +49,18 @@ struct MessageStart {
     MessageEntryType type;
     // If the complete message encoding doesn't fit, remainder is in
     // MessageChunk records to follow.
+    // headerLength is the size of the message's header in content. It is
+    // part of the totalLength and the segmentLength.
+    uint32_t headerLength;
     uint32_t totalLength;
     uint32_t segmentLength;
     char content[MaxMessageContentLength];
 
     MessageStart()
-        : type(MessageStartEntry), totalLength(0), segmentLength(0) {}
+      : type(MessageStartEntry),
+        headerLength(0),
+        totalLength(0),
+        segmentLength(0) {}
 };
 // Message-Chunk
 struct MessageChunk {
@@ -95,6 +101,16 @@ namespace qpid {
 namespace store {
 namespace ms_clfs {
 
+void
+MessageLog::initialize()
+{
+    // Write something to occupy the first record, preventing a real message
+    // from being lsn/id 0. Delete of a non-existant id is easily tossed
+    // during recovery if no other messages have caused the tail to be moved
+    // up past this dummy record by then.
+    deleteMessage(0, 0);
+}
+
 uint32_t
 MessageLog::marshallingBufferSize()
 {
@@ -114,8 +130,10 @@ MessageLog::add(const boost::intrusive_p
     // Message-Chunk records to contain the rest. If it does all fit in one
     // record, though, optimize the encoding by going straight to the
     // Message-Start record rather than encoding then copying to the record.
+    // In all case
     MessageStart entry;
     uint32_t encodedMessageLength = msg->encodedSize();
+    entry.headerLength = msg->encodedHeaderSize();
     entry.totalLength = encodedMessageLength;
     CLFS_LSN location, lastChunkLsn;
     std::auto_ptr<char> encodeStage;
@@ -165,6 +183,16 @@ MessageLog::deleteMessage(uint64_t messa
         moveTail(idToLsn(newFirstId));
 }
 
+// Load part or all of a message's content from previously stored
+// log record(s).
+void
+MessageLog::loadContent(uint64_t messageId,
+                        std::string& data,
+                        uint64_t offset,
+                        uint32_t length)
+{
+}
+
 void
 MessageLog::recordEnqueue (uint64_t messageId,
                            uint64_t queueId,
@@ -202,9 +230,11 @@ MessageLog::recover(qpid::broker::Recove
     std::map<uint64_t, MessageBlocks> reassemblies;
     std::map<uint64_t, MessageBlocks>::iterator at;
 
-    //   Note that there may be message refs in the log which are deleted, so
-    //   be sure to only add msgs at message-start record, and ignore those
-    //   that don't have an existing message record.
+    QPID_LOG(debug, "Recovering message log");
+
+    // Note that there may be message refs in the log which are deleted, so
+    // be sure to only add msgs at message-start record, and ignore those
+    // that don't have an existing message record.
     // Get the base LSN - that's how to say "start reading at the beginning"
     CLFS_INFORMATION info;
     ULONG infoLength = sizeof (info);
@@ -253,11 +283,20 @@ MessageLog::recover(qpid::broker::Recove
             // this content off to the side until the remaining record(s) are
             // located.
             if (start->totalLength == start->segmentLength) {  // Whole thing
-                qpid::framing::Buffer buff(start->content, start->totalLength);
+                // Start by recovering the header then see if the rest of
+                // the content is desired.
+                qpid::framing::Buffer buff(start->content, 
start->headerLength);
                 qpid::broker::RecoverableMessage::shared_ptr m =
                     recoverer.recoverMessage(buff);
                 m->setPersistenceId(msgId);
                 messageMap[msgId] = m;
+                uint32_t contentLength =
+                    start->totalLength - start->headerLength;
+                if (m->loadContent(contentLength)) {
+                    qpid::framing::Buffer 
content(&(start->content[start->headerLength]),
+                                                  contentLength);
+                    m->decodeContent(content);
+                }
             }
             else {
                 // Save it in a block big enough.
@@ -310,7 +349,7 @@ MessageLog::recover(qpid::broker::Recove
             enqueue = reinterpret_cast<MessageEnqueue *>(recordPointer);
             msgId = lsnToId(messageLsn);
             QPID_LOG(debug, "Message " << msgId << " Enqueue on queue " <<
-                            enqueue->queueId);
+                            enqueue->queueId << ", txn " << enqueue->transId);
             if (messageMap.find(msgId) == messageMap.end()) {
                 QPID_LOG(debug,
                          "Message " << msgId << " doesn't exist; discarded");
@@ -357,8 +396,10 @@ MessageLog::recover(qpid::broker::Recove
     }
     DWORD status = ::GetLastError();
     ::TerminateReadLog(readContext);
-    if (status == ERROR_HANDLE_EOF)  // No more records
+    if (status == ERROR_HANDLE_EOF) { // No more records
+        QPID_LOG(debug, "Message log recovered");
         return;
+    }
     throw QPID_WINDOWS_ERROR(status);
 }
 

Modified: qpid/trunk/qpid/cpp/src/qpid/store/ms-clfs/MessageLog.h
URL: 
http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/store/ms-clfs/MessageLog.h?rev=1028840&r1=1028839&r2=1028840&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/store/ms-clfs/MessageLog.h (original)
+++ qpid/trunk/qpid/cpp/src/qpid/store/ms-clfs/MessageLog.h Fri Oct 29 16:58:59 
2010
@@ -41,6 +41,11 @@ namespace ms_clfs {
  */
 class MessageLog : public Log {
 
+protected:
+    // Message log needs to have a no-op first record written in the log
+    // to ensure that no real message gets an ID 0.
+    virtual void initialize();
+
 public:
     // Inherited and reimplemented from Log. Figure the minimum marshalling
     // buffer size needed for the records this class writes.
@@ -53,6 +58,13 @@ public:
     // the earliest valid message in the log, so move the tail up to it.
     void deleteMessage(uint64_t messageId, uint64_t newFirstId);
 
+    // Load part or all of a message's content from previously stored
+    // log record(s).
+    void loadContent(uint64_t messageId,
+                     std::string& data,
+                     uint64_t offset,
+                     uint32_t length);
+
     // Enqueue and dequeue operations track messages' transit across
     // queues; each operation may be associated with a transaction. If
     // the transactionId is 0 the operation is not associated with a

Modified: qpid/trunk/qpid/cpp/src/qpid/store/ms-clfs/Messages.cpp
URL: 
http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/store/ms-clfs/Messages.cpp?rev=1028840&r1=1028839&r2=1028840&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/store/ms-clfs/Messages.cpp (original)
+++ qpid/trunk/qpid/cpp/src/qpid/store/ms-clfs/Messages.cpp Fri Oct 29 16:58:59 
2010
@@ -222,6 +222,17 @@ Messages::abort(uint64_t msgId, Transact
     }
 }
 
+// Load part or all of a message's content from previously stored
+// log record(s).
+void
+Messages::loadContent(uint64_t msgId,
+                      std::string& data,
+                      uint64_t offset,
+                      uint32_t length)
+{
+    log.loadContent(msgId, data, offset, length);
+}
+
 // Recover the current set of messages and where they're queued from
 // the log.
 void
@@ -247,7 +258,7 @@ Messages::recover(qpid::broker::Recovery
     for (msg = messageOps.begin(); msg != messageOps.end(); ++msg) {
         uint64_t msgId = msg->first;
         const std::vector<MessageLog::RecoveredMsgOp>& ops = msg->second;
-        QPID_LOG(debug, "Message " << msgId << "; " << ops.size() << " ops");
+        QPID_LOG(debug, "Message " << msgId << "; " << ops.size() << " op(s)");
         MessageInfo::shared_ptr m(new MessageInfo);
         std::vector<QueueEntry>& entries = messageQueueMap[msgId];
         std::vector<MessageLog::RecoveredMsgOp>::const_iterator op;

Modified: qpid/trunk/qpid/cpp/src/qpid/store/ms-clfs/Messages.h
URL: 
http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/store/ms-clfs/Messages.h?rev=1028840&r1=1028839&r2=1028840&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/store/ms-clfs/Messages.h (original)
+++ qpid/trunk/qpid/cpp/src/qpid/store/ms-clfs/Messages.h Fri Oct 29 16:58:59 
2010
@@ -52,8 +52,7 @@ class Messages {
 
         typedef boost::shared_ptr<MessageInfo> shared_ptr;
 
-        MessageInfo()
-            : enqueuedCount(0) { /*latestLsn.Internal = 0;*/ }
+        MessageInfo() : enqueuedCount(0) {}
     };
 
     qpid::sys::RWlock lock;
@@ -90,6 +89,13 @@ public:
     // being removed from all queues, it is deleted.
     void abort(uint64_t msgId, Transaction::shared_ptr& transaction);
 
+    // Load part or all of a message's content from previously stored
+    // log record(s).
+    void loadContent(uint64_t msgId,
+                     std::string& data,
+                     uint64_t offset,
+                     uint32_t length);
+
     // Recover the current set of messages and where they're queued from
     // the log.
     void recover(qpid::broker::RecoveryManager& recoverer,

Modified: qpid/trunk/qpid/cpp/src/qpid/store/ms-clfs/TransactionLog.cpp
URL: 
http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/store/ms-clfs/TransactionLog.cpp?rev=1028840&r1=1028839&r2=1028840&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/store/ms-clfs/TransactionLog.cpp (original)
+++ qpid/trunk/qpid/cpp/src/qpid/store/ms-clfs/TransactionLog.cpp Fri Oct 29 
16:58:59 2010
@@ -99,6 +99,16 @@ namespace qpid {
 namespace store {
 namespace ms_clfs {
 
+void
+TransactionLog::initialize()
+{
+    // Write something to occupy the first record, preventing a real
+    // transaction from being lsn/id 0. Delete of a non-existant id is easily
+    // tossed during recovery if no other transactions have caused the tail
+    // to be moved up past this dummy record by then.
+    deleteTransaction(0);
+}
+
 uint32_t
 TransactionLog::marshallingBufferSize()
 {
@@ -226,7 +236,6 @@ TransactionLog::deleteTransaction(uint64
     write(&deleteEntry, sizeof(deleteEntry), &transLsn);
     if (newFirstId != 0)
         moveTail(idToLsn(newFirstId));
-
 }
 
 void
@@ -252,6 +261,8 @@ TransactionLog::collectPreparedXids(std:
 void
 TransactionLog::recover(std::map<uint64_t, Transaction::shared_ptr>& transMap)
 {
+    QPID_LOG(debug, "Recovering transaction log");
+
     // Note that there may be transaction refs in the log which are deleted,
     // so be sure to only add transactions at Start records, and ignore those
     // that don't have an existing message record.
@@ -378,6 +389,8 @@ TransactionLog::recover(std::map<uint64_
     if (status != ERROR_HANDLE_EOF)  // No more records
         throw QPID_WINDOWS_ERROR(status);
 
+    QPID_LOG(debug, "Transaction log recovered");
+
     // At this point we have a list of all the not-deleted transactions that
     // were in existence when the broker last ran. All transactions of both
     // Dtx and Tx types that haven't prepared or committed will be aborted.

Modified: qpid/trunk/qpid/cpp/src/qpid/store/ms-clfs/TransactionLog.h
URL: 
http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/store/ms-clfs/TransactionLog.h?rev=1028840&r1=1028839&r2=1028840&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/store/ms-clfs/TransactionLog.h (original)
+++ qpid/trunk/qpid/cpp/src/qpid/store/ms-clfs/TransactionLog.h Fri Oct 29 
16:58:59 2010
@@ -58,6 +58,12 @@ class TransactionLog : public Log,
     std::map<uint64_t, boost::weak_ptr<Transaction> > validIds;
     qpid::sys::Mutex idsLock;
 
+protected:
+    // Transaction log needs to have a no-op first record written in the log
+    // to ensure that no real transaction gets an ID 0; messages think trans
+    // id 0 means "no transaction."
+    virtual void initialize();
+
 public:
     // Inherited and reimplemented from Log. Figure the minimum marshalling
     // buffer size needed for the records this class writes.



---------------------------------------------------------------------
Apache Qpid - AMQP Messaging Implementation
Project:      http://qpid.apache.org
Use/Interact: mailto:commits-subscr...@qpid.apache.org

Reply via email to