Performance fix: Check half-closed descriptors at most once per second.

A few revisions back, comm checked half-closed descriptors once per
second, but the code was buggy. I replaced it with a simpler code that
checked each half-closed descriptor whenever the OS would mark it as
ready for reading. That was a bad idea: The checks wasted a lot of CPU
cycles because half-closed descriptors are usually ready for reading all
the time.

This revision resurrects 1 check/sec limit, but hopefully with fewer
bugs. In my limited tests, CPU usage seems to be back to normal.


Added a DescriptorSet class to manage an unordered collection of unique
descriptors. The class might be useful for deferred reads as well, but
that remains to be seen.

The DescriptorSet class has O(1) complexity for search, insertion,
and deletion. It uses about 2*sizeof(int)*MaxFD bytes. Splay tree that
used to store half-closed descriptors previously uses less RAM for small
number of descriptors but has O(log n) complexity.

The DescriptorSet code should probably get its own .h and .cc files,
especially if it is going to be used by deferred reads.

Thank you,

Alex.

Performance fix: Check half-closed descriptors at most once per second.

A few revisions back, comm checked half-closed descriptors once per second,
but the code was buggy. I replaced it with a simpler code that checked each
half-closed descriptor whenever the OS would mark it as ready for reading.
That was a bad idea: The checks wasted a lot of CPU cycles because half-closed
descriptors are usually ready for reading all the time.

This revision resurrects 1 check/sec limit, but hopefully with fewer bugs. In
my limited tests CPU usage seems to be back to normal.


Added a DescriptorSet class to manage an unordered collection of unique
descriptors. The class might be useful for deferred reads as well, but that
remains to be seen.

The DescriptorSet class has O(1) complexity for search, insertion,
and deletion. It uses about 2*sizeof(int)*MaxFD bytes. Splay tree that used to
store half-closed descriptors previously uses less RAM for small number of
descriptors but has O(log n) complexity.

=== modified file 'src/comm.cc'
--- src/comm.cc	2008-09-22 21:56:44 +0000
+++ src/comm.cc	2008-09-24 23:09:33 +0000
@@ -224,6 +224,144 @@
     CBDATA_CLASS(ConnectStateData);
 };
 
+/// an unordered collection of unique descriptors with O(1) complexity
+class DescriptorSet {
+// \todo: Should we use std::set<int> with its flexibility? Our implementation
+// has constant overhead, which is smaller than log(n) of std::set.
+public:
+    // for STL compatibility, should we decide to switch to std::set or similar
+    typedef const int *const_iterator;
+
+    DescriptorSet();
+    ~DescriptorSet();
+
+    /// checks whether fd is in the set
+    bool has(const int fd) const { return 0 <= fd && fd < capacity_ &&
+        index_[fd] >= 0; }
+
+    bool add(int fd); /// adds if unique; returns true if added
+    bool del(int fd); /// deletes if there; returns true if deleted
+    int pop(); /// deletes and returns one descriptor, in unspecified order
+
+    bool empty() const { return !size_; } /// number of descriptors in the set
+
+    /// begin iterator a la STL; may become invalid if the object is modified
+    const_iterator begin() const { return descriptors_; }
+    /// end iterator a la STL; may become invalid if the object is modified
+    const_iterator end() const { return begin() + size_; }
+
+    void print(std::ostream &os) const;
+
+private:
+    // these would be easy to support when needed; prohibit for now
+    DescriptorSet(const DescriptorSet &s); // declared but undefined
+    DescriptorSet &operator =(const DescriptorSet &s); // declared, undefined
+
+    int *descriptors_; /// descriptor values in random order
+    int *index_; /// descriptor:position index into descriptors_
+    int capacity_; /// number of available descriptor slots
+    int size_; /// number of descriptors in the set
+};
+
+inline std::ostream &
+operator <<(std::ostream &os, const DescriptorSet &ds)
+{
+    ds.print(os);
+    return os;
+}
+
+static DescriptorSet *TheHalfClosed = NULL; /// the set of half-closed FDs
+static bool WillCheckHalfClosed = false; /// true if check is scheduled
+static EVH commHalfClosedCheck;
+static void commPlanHalfClosedCheck();
+
+DescriptorSet::DescriptorSet(): descriptors_(NULL), index_(NULL),
+    capacity_(0), size_(0)
+{
+    // we allocate once and never realloc, at least for now
+    capacity_ = Squid_MaxFD;
+    descriptors_ = new int[capacity_];
+    index_ = new int[capacity_];
+
+    // fill index with -1s to be able to say whether a descriptor is present
+    // it is not essential to fill the descriptors, but it enables more checks
+    for (int i = 0; i < capacity_; ++i)
+        index_[i] = descriptors_[i] = -1;
+}
+
+DescriptorSet::~DescriptorSet()
+{
+    delete[] descriptors_;
+    delete[] index_;
+}
+
+/// adds if unique; returns true if added
+bool
+DescriptorSet::add(int fd)
+{
+    assert(0 <= fd && fd < capacity_); // \todo: replace with Must()
+
+    if (has(fd))
+        return false; // already have it
+
+    assert(size_ < capacity_); // \todo: replace with Must()
+    const int pos = size_++;
+    index_[fd] = pos;
+    descriptors_[pos] = fd;
+    return true; // really added
+}
+
+/// deletes if there; returns true if deleted
+bool
+DescriptorSet::del(int fd)
+{
+    assert(0 <= fd && fd < capacity_); // \todo: here and below, use Must()
+
+    if (!has(fd))
+        return false; // we do not have it
+
+    assert(!empty());
+    const int delPos = index_[fd];
+    assert(0 <= delPos && delPos < capacity_);
+
+    // move the last descriptor to the deleted fd position
+    // to avoid skipping deleted descriptors in pop()
+    const int lastPos = size_-1;
+    const int lastFd = descriptors_[lastPos];
+    assert(delPos <= lastPos); // may be the same
+    descriptors_[delPos] = lastFd;
+    index_[lastFd] = delPos;
+
+    descriptors_[lastPos] = -1;
+    index_[fd] = -1;
+    --size_;
+
+    return true; // really added
+}
+
+/// ejects one descriptor in unspecified order
+int
+DescriptorSet::pop()
+{
+    assert(!empty());
+    const int lastPos =--size_;
+    const int lastFd = descriptors_[lastPos];
+    assert(0 <= lastFd && lastFd < capacity_);
+
+    // cleanup
+    descriptors_[lastPos] = -1;
+    index_[lastFd] = -1;
+
+    return lastFd;
+}
+
+void
+DescriptorSet::print(std::ostream &os) const
+{
+    // TODO: add "name" if the set is used for more than just half-closed FDs
+    os << size_ << " FDs";
+}
+
 /* STATIC */
 
 static comm_err_t commBind(int s, struct addrinfo &);
@@ -347,7 +485,7 @@
     // Active/passive conflicts are OK and simply cancel passive monitoring.
     if (ccb->active()) {
         // if the assertion below fails, we have an active comm_read conflict
-        assert(commHasHalfClosedMonitor(fd));
+        assert(fd_table[fd].halfClosedReader != NULL);
         commStopHalfClosedMonitor(fd);
         assert(!ccb->active());
     }
@@ -1591,6 +1729,9 @@
     startParams.fd = fd;
     ScheduleCallHere(startCall);
 
+    // a half-closed fd may lack a reader, so we stop monitoring explicitly
+    if (commHasHalfClosedMonitor(fd))
+        commStopHalfClosedMonitor(fd);
     commSetTimeout(fd, -1, NULL, NULL);
 
     // notify read/write handlers
@@ -1928,10 +2069,15 @@
     RESERVED_FD = XMIN(100, Squid_MaxFD / 4);
 
     conn_close_pool = memPoolCreate("close_handler", sizeof(close_handler));
+
+    TheHalfClosed = new DescriptorSet;
 }
 
 void
 comm_exit(void) {
+    delete TheHalfClosed;
+    TheHalfClosed = NULL;
+
     safe_free(fd_table);
     safe_free(fdd_table);
     if (fdc_table) {
@@ -2395,37 +2541,65 @@
 // will close the connection on read errors.
 void
 commStartHalfClosedMonitor(int fd) {
+    debugs(5, 5, HERE << "adding FD " << fd << " to " << *TheHalfClosed);
     assert(isOpen(fd));
     assert(!commHasHalfClosedMonitor(fd));
-
-    AsyncCall::Pointer call = commCbCall(5,4, "commHalfClosedReader",
-	CommIoCbPtrFun(&commHalfClosedReader, NULL));
-    comm_read(fd, NULL, 0, call);
+    (void)TheHalfClosed->add(fd); // could also assert the result
+    commPlanHalfClosedCheck(); // may schedule check if we added the first FD
+}
+
+static
+void
+commPlanHalfClosedCheck()
+{
+    if (!WillCheckHalfClosed && !TheHalfClosed->empty()) {
+        eventAdd("commHalfClosedCheck", &commHalfClosedCheck, NULL, 1.0, 1);
+        WillCheckHalfClosed = true;
+    }
+}
+
+/// iterates over all descriptors that may need half-closed tests and
+/// calls comm_read for those that do; re-schedules the check if needed
+static
+void
+commHalfClosedCheck(void *) {
+    debugs(5, 5, HERE << "checking " << *TheHalfClosed);
+
+    typedef DescriptorSet::const_iterator DSCI;
+    const DSCI end = TheHalfClosed->end();
+    for (DSCI i = TheHalfClosed->begin(); i != end; ++i) {
+        const int fd = *i;
+        if (!fd_table[fd].halfClosedReader) { // not reading already
+            AsyncCall::Pointer call = commCbCall(5,4, "commHalfClosedReader",
+                CommIoCbPtrFun(&commHalfClosedReader, NULL));
+            comm_read(fd, NULL, 0, call);
+            fd_table[fd].halfClosedReader = call;
+        }
+    }
+
+    WillCheckHalfClosed = false; // as far as we know
+    commPlanHalfClosedCheck(); // may need to check again
 }
 
 /// checks whether we are waiting for possibly half-closed connection to close
 // We are monitoring if the read handler for the fd is the monitoring handler.
 bool
 commHasHalfClosedMonitor(int fd) {
-    assert(isOpen(fd));
-
-    if (const comm_io_callback_t *cb = COMMIO_FD_READCB(fd)) {
-	AsyncCall::Pointer call = cb->callback;
-	if (call != NULL) {
-	    // check whether the callback has the right type (it should)
-	    // and uses commHalfClosedReader as the address to call back
-            typedef CommIoCbPtrFun IoDialer;
-	    if (IoDialer *d = dynamic_cast<IoDialer*>(call->getDialer()))
-	        return d->handler == &commHalfClosedReader;
-	}
-    }
-    return false;
+    return TheHalfClosed->has(fd);
 }
 
 /// stop waiting for possibly half-closed connection to close
 static void
 commStopHalfClosedMonitor(int const fd) {
-    comm_read_cancel(fd, &commHalfClosedReader, NULL);
+    debugs(5, 5, HERE << "removing FD " << fd << " from " << *TheHalfClosed);
+
+    // cancel the read if one was scheduled
+    AsyncCall::Pointer reader = fd_table[fd].halfClosedReader;
+    if (reader != NULL)
+        comm_read_cancel(fd, reader);
+    fd_table[fd].halfClosedReader = NULL;
+
+    TheHalfClosed->del(fd);
 }
 
 /// I/O handler for the possibly half-closed connection monitoring code
@@ -2433,6 +2607,9 @@
 commHalfClosedReader(int fd, char *, size_t size, comm_err_t flag, int, void *) {
     // there cannot be more data coming in on half-closed connections
     assert(size == 0); 
+    assert(commHasHalfClosedMonitor(fd)); // or we would have canceled the read
+
+    fd_table[fd].halfClosedReader = NULL; // done reading, for now
 
     // nothing to do if fd is being closed
     if (flag == COMM_ERR_CLOSING)
@@ -2446,7 +2623,7 @@
     }
 
     // continue waiting for close or error
-    commStartHalfClosedMonitor(fd);
+    commPlanHalfClosedCheck(); // make sure this fd will be checked again
 }
 
 

=== modified file 'src/fde.h'
--- src/fde.h	2008-09-11 05:58:32 +0000
+++ src/fde.h	2008-09-24 21:14:03 +0000
@@ -101,6 +101,7 @@
     time_t timeout;
     void *lifetime_data;
     AsyncCall::Pointer closeHandler;
+    AsyncCall::Pointer halfClosedReader; /// read handler for half-closed fds
     CommWriteStateData *wstate;         /* State data for comm_write */
     READ_HANDLER *read_method;
     WRITE_HANDLER *write_method;
@@ -121,6 +122,7 @@
     inline void clear() {
         timeoutHandler = NULL;
         closeHandler = NULL;
+        halfClosedReader = NULL;
         // XXX: the following memset may corrupt or leak new or changed members
         memset(this, 0, sizeof(fde));
         local_addr.SetEmpty(); // IPAddress likes to be setup nicely.

Reply via email to