On 02/08/2014 11:24 AM, Nikolai Gorchilov wrote: > Unfortunately, the rock store issue I've found isn't related to r13201 > regression as presumed by Amos. > > It's still available with both stable (3.4.x) and development > (3.HEAD) branches.
Indeed. If you can, please try the attached trunk patch. It should fix one old Rock bug exposed by your excellent debugging logs, but I have not tried to actually reproduce the issue and check that (a) the patch works and (b) there are no other bugs affecting your test case. Thank you. Alex.
Fixed stalled concurrent rock store reads by insuring their ID uniqueness. Added a check to prevent similar bugs from occurring in the future. === modified file 'src/DiskIO/IpcIo/IpcIoFile.cc' --- src/DiskIO/IpcIo/IpcIoFile.cc 2014-01-27 05:27:41 +0000 +++ src/DiskIO/IpcIo/IpcIoFile.cc 2014-02-21 05:18:15 +0000 @@ -290,85 +290,90 @@ IpcIoFile::writeCompleted(WriteRequest * (writeRequest->free_func)(const_cast<char*>(writeRequest->buf)); // broken API? if (!ioError) { debugs(79,5, HERE << "wrote " << writeRequest->len << " to disker" << diskId << " at " << writeRequest->offset); } const ssize_t rlen = ioError ? 0 : (ssize_t)writeRequest->len; const int errflag = ioError ? DISK_ERROR :DISK_OK; ioRequestor->writeCompleted(errflag, rlen, writeRequest); } bool IpcIoFile::ioInProgress() const { return !olderRequests->empty() || !newerRequests->empty(); } /// track a new pending request void -IpcIoFile::trackPendingRequest(IpcIoPendingRequest *const pending) +IpcIoFile::trackPendingRequest(const unsigned int id, IpcIoPendingRequest *const pending) { - newerRequests->insert(std::make_pair(lastRequestId, pending)); + const std::pair<RequestMap::iterator,bool> result = + newerRequests->insert(std::make_pair(id, pending)); + Must(result.second); // failures means that id was not unique if (!timeoutCheckScheduled) scheduleTimeoutCheck(); } /// push an I/O request to disker void IpcIoFile::push(IpcIoPendingRequest *const pending) { // prevent queue overflows: check for responses to earlier requests + // warning: this call may result in indirect push() recursion HandleResponses("before push"); debugs(47, 7, HERE); Must(diskId >= 0); Must(pending); Must(pending->readRequest || pending->writeRequest); IpcIoMsg ipcIo; try { + if (++lastRequestId == 0) // don't use zero value as requestId + ++lastRequestId; ipcIo.requestId = lastRequestId; ipcIo.start = current_time; if (pending->readRequest) { ipcIo.command = IpcIo::cmdRead; ipcIo.offset = pending->readRequest->offset; ipcIo.len = pending->readRequest->len; } else { // pending->writeRequest Must(pending->writeRequest->len <= Ipc::Mem::PageSize()); if (!Ipc::Mem::GetPage(Ipc::Mem::PageId::ioPage, ipcIo.page)) { ipcIo.len = 0; throw TexcHere("run out of shared memory pages for IPC I/O"); } ipcIo.command = IpcIo::cmdWrite; ipcIo.offset = pending->writeRequest->offset; ipcIo.len = pending->writeRequest->len; char *const buf = Ipc::Mem::PagePointer(ipcIo.page); memcpy(buf, pending->writeRequest->buf, ipcIo.len); // optimize away } debugs(47, 7, HERE << "pushing " << SipcIo(KidIdentifier, ipcIo, diskId)); if (queue->push(diskId, ipcIo)) Notify(diskId); // must notify disker - trackPendingRequest(pending); + trackPendingRequest(ipcIo.requestId, pending); } catch (const Queue::Full &) { debugs(47, DBG_IMPORTANT, "Worker I/O push queue overflow: " << SipcIo(KidIdentifier, ipcIo, diskId)); // TODO: report queue len // TODO: grow queue size pending->completeIo(NULL); delete pending; } catch (const TextException &e) { debugs(47, DBG_IMPORTANT, HERE << e.what()); pending->completeIo(NULL); delete pending; } } /// whether we think there is enough time to complete the I/O bool IpcIoFile::canWait() const { if (!config.ioTimeout) return true; // no timeout specified @@ -592,43 +597,40 @@ IpcIoFile::getFD() const } /* IpcIoMsg */ IpcIoMsg::IpcIoMsg(): requestId(0), offset(0), len(0), command(IpcIo::cmdNone), xerrno(0) { start.tv_sec = 0; start.tv_usec = 0; } /* IpcIoPendingRequest */ IpcIoPendingRequest::IpcIoPendingRequest(const IpcIoFile::Pointer &aFile): file(aFile), readRequest(NULL), writeRequest(NULL) { - Must(file != NULL); - if (++file->lastRequestId == 0) // don't use zero value as requestId - ++file->lastRequestId; } void IpcIoPendingRequest::completeIo(IpcIoMsg *const response) { if (readRequest) file->readCompleted(readRequest, response); else if (writeRequest) file->writeCompleted(writeRequest, response); else { Must(!response); // only timeouts are handled here file->openCompleted(NULL); } } /* XXX: disker code that should probably be moved elsewhere */ static int TheFile = -1; ///< db file descriptor static void === modified file 'src/DiskIO/IpcIo/IpcIoFile.h' --- src/DiskIO/IpcIo/IpcIoFile.h 2013-10-25 00:13:46 +0000 +++ src/DiskIO/IpcIo/IpcIoFile.h 2014-02-21 05:17:46 +0000 @@ -68,41 +68,41 @@ public: virtual bool canRead() const; virtual bool canWrite() const; virtual bool ioInProgress() const; /// handle open response from coordinator static void HandleOpenResponse(const Ipc::StrandSearchResponse &response); /// handle queue push notifications from worker or disker static void HandleNotification(const Ipc::TypedMsgHdr &msg); DiskFile::Config config; ///< supported configuration options protected: friend class IpcIoPendingRequest; void openCompleted(const Ipc::StrandSearchResponse *const response); void readCompleted(ReadRequest *readRequest, IpcIoMsg *const response); void writeCompleted(WriteRequest *writeRequest, const IpcIoMsg *const response); bool canWait() const; private: - void trackPendingRequest(IpcIoPendingRequest *const pending); + void trackPendingRequest(const unsigned int id, IpcIoPendingRequest *const pending); void push(IpcIoPendingRequest *const pending); IpcIoPendingRequest *dequeueRequest(const unsigned int requestId); static void Notify(const int peerId); static void OpenTimeout(void *const param); static void CheckTimeouts(void *const param); void checkTimeouts(); void scheduleTimeoutCheck(); static void HandleResponses(const char *const when); void handleResponse(IpcIoMsg &ipcIo); static void DiskerHandleMoreRequests(void*); static void DiskerHandleRequests(); static void DiskerHandleRequest(const int workerId, IpcIoMsg &ipcIo); static bool WaitBeforePop(); private: const String dbName; ///< the name of the file we are managing