On 02/08/2014 11:24 AM, Nikolai Gorchilov wrote:

> Unfortunately, the rock store issue I've found isn't related to r13201
> regression as presumed by Amos.
> 
> It's still available with both stable (3.4.x)  and development
> (3.HEAD) branches.

Indeed. If you can, please try the attached trunk patch. It should fix
one old Rock bug exposed by your excellent debugging logs, but I have
not tried to actually reproduce the issue and check that (a) the patch
works and (b) there are no other bugs affecting your test case.


Thank you.

Alex.

Fixed stalled concurrent rock store reads by insuring their ID uniqueness.

Added a check to prevent similar bugs from occurring in the future.

=== modified file 'src/DiskIO/IpcIo/IpcIoFile.cc'
--- src/DiskIO/IpcIo/IpcIoFile.cc	2014-01-27 05:27:41 +0000
+++ src/DiskIO/IpcIo/IpcIoFile.cc	2014-02-21 05:18:15 +0000
@@ -290,85 +290,90 @@ IpcIoFile::writeCompleted(WriteRequest *
         (writeRequest->free_func)(const_cast<char*>(writeRequest->buf)); // broken API?
 
     if (!ioError) {
         debugs(79,5, HERE << "wrote " << writeRequest->len << " to disker" <<
                diskId << " at " << writeRequest->offset);
     }
 
     const ssize_t rlen = ioError ? 0 : (ssize_t)writeRequest->len;
     const int errflag = ioError ? DISK_ERROR :DISK_OK;
     ioRequestor->writeCompleted(errflag, rlen, writeRequest);
 }
 
 bool
 IpcIoFile::ioInProgress() const
 {
     return !olderRequests->empty() || !newerRequests->empty();
 }
 
 /// track a new pending request
 void
-IpcIoFile::trackPendingRequest(IpcIoPendingRequest *const pending)
+IpcIoFile::trackPendingRequest(const unsigned int id, IpcIoPendingRequest *const pending)
 {
-    newerRequests->insert(std::make_pair(lastRequestId, pending));
+    const std::pair<RequestMap::iterator,bool> result =
+        newerRequests->insert(std::make_pair(id, pending));
+    Must(result.second); // failures means that id was not unique
     if (!timeoutCheckScheduled)
         scheduleTimeoutCheck();
 }
 
 /// push an I/O request to disker
 void
 IpcIoFile::push(IpcIoPendingRequest *const pending)
 {
     // prevent queue overflows: check for responses to earlier requests
+    // warning: this call may result in indirect push() recursion
     HandleResponses("before push");
 
     debugs(47, 7, HERE);
     Must(diskId >= 0);
     Must(pending);
     Must(pending->readRequest || pending->writeRequest);
 
     IpcIoMsg ipcIo;
     try {
+        if (++lastRequestId == 0) // don't use zero value as requestId
+            ++lastRequestId;
         ipcIo.requestId = lastRequestId;
         ipcIo.start = current_time;
         if (pending->readRequest) {
             ipcIo.command = IpcIo::cmdRead;
             ipcIo.offset = pending->readRequest->offset;
             ipcIo.len = pending->readRequest->len;
         } else { // pending->writeRequest
             Must(pending->writeRequest->len <= Ipc::Mem::PageSize());
             if (!Ipc::Mem::GetPage(Ipc::Mem::PageId::ioPage, ipcIo.page)) {
                 ipcIo.len = 0;
                 throw TexcHere("run out of shared memory pages for IPC I/O");
             }
             ipcIo.command = IpcIo::cmdWrite;
             ipcIo.offset = pending->writeRequest->offset;
             ipcIo.len = pending->writeRequest->len;
             char *const buf = Ipc::Mem::PagePointer(ipcIo.page);
             memcpy(buf, pending->writeRequest->buf, ipcIo.len); // optimize away
         }
 
         debugs(47, 7, HERE << "pushing " << SipcIo(KidIdentifier, ipcIo, diskId));
 
         if (queue->push(diskId, ipcIo))
             Notify(diskId); // must notify disker
-        trackPendingRequest(pending);
+        trackPendingRequest(ipcIo.requestId, pending);
     } catch (const Queue::Full &) {
         debugs(47, DBG_IMPORTANT, "Worker I/O push queue overflow: " <<
                SipcIo(KidIdentifier, ipcIo, diskId)); // TODO: report queue len
         // TODO: grow queue size
 
         pending->completeIo(NULL);
         delete pending;
     } catch (const TextException &e) {
         debugs(47, DBG_IMPORTANT, HERE << e.what());
         pending->completeIo(NULL);
         delete pending;
     }
 }
 
 /// whether we think there is enough time to complete the I/O
 bool
 IpcIoFile::canWait() const
 {
     if (!config.ioTimeout)
         return true; // no timeout specified
@@ -592,43 +597,40 @@ IpcIoFile::getFD() const
 }
 
 /* IpcIoMsg */
 
 IpcIoMsg::IpcIoMsg():
         requestId(0),
         offset(0),
         len(0),
         command(IpcIo::cmdNone),
         xerrno(0)
 {
     start.tv_sec = 0;
     start.tv_usec = 0;
 }
 
 /* IpcIoPendingRequest */
 
 IpcIoPendingRequest::IpcIoPendingRequest(const IpcIoFile::Pointer &aFile):
         file(aFile), readRequest(NULL), writeRequest(NULL)
 {
-    Must(file != NULL);
-    if (++file->lastRequestId == 0) // don't use zero value as requestId
-        ++file->lastRequestId;
 }
 
 void
 IpcIoPendingRequest::completeIo(IpcIoMsg *const response)
 {
     if (readRequest)
         file->readCompleted(readRequest, response);
     else if (writeRequest)
         file->writeCompleted(writeRequest, response);
     else {
         Must(!response); // only timeouts are handled here
         file->openCompleted(NULL);
     }
 }
 
 /* XXX: disker code that should probably be moved elsewhere */
 
 static int TheFile = -1; ///< db file descriptor
 
 static void

=== modified file 'src/DiskIO/IpcIo/IpcIoFile.h'
--- src/DiskIO/IpcIo/IpcIoFile.h	2013-10-25 00:13:46 +0000
+++ src/DiskIO/IpcIo/IpcIoFile.h	2014-02-21 05:17:46 +0000
@@ -68,41 +68,41 @@ public:
     virtual bool canRead() const;
     virtual bool canWrite() const;
     virtual bool ioInProgress() const;
 
     /// handle open response from coordinator
     static void HandleOpenResponse(const Ipc::StrandSearchResponse &response);
 
     /// handle queue push notifications from worker or disker
     static void HandleNotification(const Ipc::TypedMsgHdr &msg);
 
     DiskFile::Config config; ///< supported configuration options
 
 protected:
     friend class IpcIoPendingRequest;
     void openCompleted(const Ipc::StrandSearchResponse *const response);
     void readCompleted(ReadRequest *readRequest, IpcIoMsg *const response);
     void writeCompleted(WriteRequest *writeRequest, const IpcIoMsg *const response);
     bool canWait() const;
 
 private:
-    void trackPendingRequest(IpcIoPendingRequest *const pending);
+    void trackPendingRequest(const unsigned int id, IpcIoPendingRequest *const pending);
     void push(IpcIoPendingRequest *const pending);
     IpcIoPendingRequest *dequeueRequest(const unsigned int requestId);
 
     static void Notify(const int peerId);
 
     static void OpenTimeout(void *const param);
     static void CheckTimeouts(void *const param);
     void checkTimeouts();
     void scheduleTimeoutCheck();
 
     static void HandleResponses(const char *const when);
     void handleResponse(IpcIoMsg &ipcIo);
 
     static void DiskerHandleMoreRequests(void*);
     static void DiskerHandleRequests();
     static void DiskerHandleRequest(const int workerId, IpcIoMsg &ipcIo);
     static bool WaitBeforePop();
 
 private:
     const String dbName; ///< the name of the file we are managing

Reply via email to