---
src/DiskIO/IpcIo/IpcIoFile.cc | 2 +-
src/ipc/Queue.h | 8 ++++----
2 files changed, 5 insertions(+), 5 deletions(-)
diff --git src/DiskIO/IpcIo/IpcIoFile.cc src/DiskIO/IpcIo/IpcIoFile.cc
index aec7b40..337fdb9 100644
--- src/DiskIO/IpcIo/IpcIoFile.cc
+++ src/DiskIO/IpcIo/IpcIoFile.cc
@@ -340,41 +340,41 @@ IpcIoFile::push(IpcIoPendingRequest *const pending)
SipcIo(KidIdentifier, ipcIo, diskId)); // TODO: report queue len
// TODO: grow queue size
pending->completeIo(NULL);
delete pending;
} catch (const TextException &e) {
debugs(47, DBG_IMPORTANT, HERE << e.what());
pending->completeIo(NULL);
delete pending;
}
}
/// whether we think there is enough time to complete the I/O
bool
IpcIoFile::canWait() const
{
if (!config.ioTimeout)
return true; // no timeout specified
IpcIoMsg oldestIo;
- if (!queue->peek(diskId, oldestIo) || oldestIo.start.tv_sec <= 0)
+ if (!queue->findOldest(diskId, oldestIo) || oldestIo.start.tv_sec <= 0)
return true; // we cannot estimate expected wait time; assume it is OK
const int expectedWait = tvSubMsec(oldestIo.start, current_time);
if (expectedWait < 0 ||
static_cast<time_msec_t>(expectedWait) < config.ioTimeout)
return true; // expected wait time is acceptible
debugs(47,2, HERE << "cannot wait: " << expectedWait <<
" oldest: " << SipcIo(KidIdentifier, oldestIo, diskId));
return false; // do not want to wait that long
}
/// called when coordinator responds to worker open request
void
IpcIoFile::HandleOpenResponse(const Ipc::StrandSearchResponse &response)
{
debugs(47, 7, HERE << "coordinator response to open request");
for (IpcIoFileList::iterator i = WaitingForOpen.begin();
i != WaitingForOpen.end(); ++i) {
if (response.strand.tag == (*i)->dbName) {
diff --git src/ipc/Queue.h src/ipc/Queue.h
index 2098610..72fe3e5 100644
--- src/ipc/Queue.h
+++ src/ipc/Queue.h
@@ -181,43 +181,43 @@ public:
Mem::Owner<QueueReaders> *const readersOwner;
};
static Owner *Init(const String &id, const int groupASize, const int groupAIdOffset, const int groupBSize, const int groupBIdOffset, const unsigned int maxItemSize, const int capacity);
enum Group { groupA = 0, groupB = 1 };
FewToFewBiQueue(const String &id, const Group aLocalGroup, const int aLocalProcessId);
Group localGroup() const { return theLocalGroup; }
Group remoteGroup() const { return theLocalGroup == groupA ? groupB : groupA; }
/// clears the reader notification received by the local process from the remote process
void clearReaderSignal(const int remoteProcessId);
/// picks a process and calls OneToOneUniQueue::pop() using its queue
template <class Value> bool pop(int &remoteProcessId, Value &value);
/// calls OneToOneUniQueue::push() using the given process queue
template <class Value> bool push(const int remoteProcessId, const Value &value);
- // TODO: rename to findOldest() or some such
- /// calls OneToOneUniQueue::peek() using the given process queue
- template<class Value> bool peek(const int remoteProcessId, Value &value) const;
+ /// finds the oldest item in incoming and outgoing queues between
+ /// us and the given remote process
+ template<class Value> bool findOldest(const int remoteProcessId, Value &value) const;
/// returns true if pop() would have probably succeeded but does not pop()
bool popReady() const;
/// returns local reader's balance
QueueReader::Balance &localBalance();
/// returns local reader's rate limit
QueueReader::Rate &localRateLimit();
private:
bool validProcessId(const Group group, const int processId) const;
int oneToOneQueueIndex(const Group fromGroup, const int fromProcessId, const Group toGroup, const int toProcessId) const;
const OneToOneUniQueue &oneToOneQueue(const Group fromGroup, const int fromProcessId, const Group toGroup, const int toProcessId) const;
OneToOneUniQueue &oneToOneQueue(const Group fromGroup, const int fromProcessId, const Group toGroup, const int toProcessId);
QueueReader &reader(const Group group, const int processId);
const QueueReader &reader(const Group group, const int processId) const;
int readerIndex(const Group group, const int processId) const;
int remoteGroupSize() const { return theLocalGroup == groupA ? metadata->theGroupBSize : metadata->theGroupASize; }
int remoteGroupIdOffset() const { return theLocalGroup == groupA ? metadata->theGroupBIdOffset : metadata->theGroupAIdOffset; }
@@ -333,41 +333,41 @@ FewToFewBiQueue::pop(int &remoteProcessId, Value &value)
remoteProcessId = theLastPopProcessId;
debugs(54, 7, HERE << "popped from " << remoteProcessId << " to " << theLocalProcessId << " at " << queue.size());
return true;
}
}
return false; // no process had anything to pop
}
template <class Value>
bool
FewToFewBiQueue::push(const int remoteProcessId, const Value &value)
{
OneToOneUniQueue &remoteQueue = oneToOneQueue(theLocalGroup, theLocalProcessId, remoteGroup(), remoteProcessId);
QueueReader &remoteReader = reader(remoteGroup(), remoteProcessId);
debugs(54, 7, HERE << "pushing from " << theLocalProcessId << " to " << remoteProcessId << " at " << remoteQueue.size());
return remoteQueue.push(value, &remoteReader);
}
template <class Value>
bool
-FewToFewBiQueue::peek(const int remoteProcessId, Value &value) const
+FewToFewBiQueue::findOldest(const int remoteProcessId, Value &value) const
{
// we may be called before remote process configured its queue end
if (!validProcessId(remoteGroup(), remoteProcessId))
return false;
// we need the oldest value, so start with the incoming, them-to-us queue:
const OneToOneUniQueue &inQueue = oneToOneQueue(remoteGroup(), remoteProcessId, theLocalGroup, theLocalProcessId);
debugs(54, 2, HERE << "peeking from " << remoteProcessId << " to " << theLocalProcessId << " at " << inQueue.size());
if (inQueue.peek(value))
return true;
// if the incoming queue is empty, check the outgoing, us-to-them queue:
const OneToOneUniQueue &outQueue = oneToOneQueue(theLocalGroup, theLocalProcessId, remoteGroup(), remoteProcessId);
debugs(54, 2, HERE << "peeking from " << theLocalProcessId << " to " << remoteProcessId << " at " << outQueue.size());
return outQueue.peek(value);
}
} // namespace Ipc
#endif // SQUID_IPC_QUEUE_H