Attached patch contains the cleanup-comm branch changes to ListenStateData which make it an AsyncJob (called Comm::TcpAcceptor) using Subscriptions to generate calls.

This fixes the call re-use problem of bug 3081 and a related FTP data connection bug.

Alex:
please test that this does not break with SMP support. I've run it with 2 workers on a non-SMP machine, but have not checked for anything more complex.

NP: there are still FTP problems with EPRT/PORT handling of data connections from servers which do not terminate their directory transfer with \r\n. That is outside this patch scope.

Amos
=== modified file 'src/CommCalls.cc'
--- src/CommCalls.cc	2009-07-12 22:56:47 +0000
+++ src/CommCalls.cc	2011-01-09 01:36:41 +0000
@@ -130,6 +130,12 @@
 {
 }
 
+CommAcceptCbPtrFun::CommAcceptCbPtrFun(const CommAcceptCbPtrFun &o):
+        CommDialerParamsT<CommAcceptCbParams>(o.params),
+        handler(o.handler)
+{
+}
+
 void
 CommAcceptCbPtrFun::dial()
 {

=== modified file 'src/CommCalls.h'
--- src/CommCalls.h	2010-08-24 00:12:54 +0000
+++ src/CommCalls.h	2011-01-09 04:56:21 +0000
@@ -176,8 +176,11 @@
 {
 public:
     typedef CommAcceptCbParams Params;
+    typedef RefCount<CommAcceptCbPtrFun> Pointer;
 
     CommAcceptCbPtrFun(IOACB *aHandler, const CommAcceptCbParams &aParams);
+    CommAcceptCbPtrFun(const CommAcceptCbPtrFun &o);
+
     void dial();
 
     virtual void print(std::ostream &os) const;
@@ -259,11 +262,17 @@
 class CommCbFunPtrCallT: public AsyncCall
 {
 public:
+    typedef RefCount<CommCbFunPtrCallT<Dialer> > Pointer;
     typedef typename Dialer::Params Params;
 
     inline CommCbFunPtrCallT(int debugSection, int debugLevel,
                              const char *callName, const Dialer &aDialer);
 
+    inline CommCbFunPtrCallT(const Pointer &p) :
+            AsyncCall(p->debugSection, p->debugLevel, p->name),
+            dialer(p->dialer)
+        {}
+
     virtual CallDialer* getDialer() { return &dialer; }
 
 public:

=== modified file 'src/ProtoPort.cc'
--- src/ProtoPort.cc	2010-11-18 08:01:53 +0000
+++ src/ProtoPort.cc	2011-01-09 00:58:29 +0000
@@ -3,15 +3,17 @@
  */
 
 #include "squid.h"
+#include "comm.h"
 #include "ProtoPort.h"
 #if HAVE_LIMITS
 #include <limits>
 #endif
 
-http_port_list::http_port_list(const char *aProtocol)
+http_port_list::http_port_list(const char *aProtocol) :
+        listenFd(-1)
 #if USE_SSL
-        :
-        http(*this), dynamicCertMemCacheSize(std::numeric_limits<size_t>::max())
+        , http(*this)
+        , dynamicCertMemCacheSize(std::numeric_limits<size_t>::max())
 #endif
 {
     protocol = xstrdup(aProtocol);
@@ -19,7 +21,10 @@
 
 http_port_list::~http_port_list()
 {
-    delete listener;
+    if (listenFd >= 0) {
+        comm_close(listenFd);
+        listenFd = -1;
+    }
 
     safe_free(name);
     safe_free(defaultsite);

=== modified file 'src/ProtoPort.h'
--- src/ProtoPort.h	2010-11-18 08:01:53 +0000
+++ src/ProtoPort.h	2011-01-09 00:55:34 +0000
@@ -4,9 +4,7 @@
 #ifndef SQUID_PROTO_PORT_H
 #define SQUID_PROTO_PORT_H
 
-//#include "typedefs.h"
 #include "cbdata.h"
-#include "comm/ListenStateData.h"
 
 #if USE_SSL
 #include "ssl/gadgets.h"
@@ -43,11 +41,11 @@
     } tcp_keepalive;
 
     /**
-     * The FD listening socket handler.
-     * If not NULL we are actively listening for client requests.
-     * delete to close the socket.
+     * The FD listening socket.
+     * If >= 0 we are actively listening for client requests.
+     * use comm_close(listenFd) to stop.
      */
-    Comm::ListenStateData *listener;
+    int listenFd;
 
 #if USE_SSL
     // XXX: temporary hack to ease move of SSL options to http_port

=== modified file 'src/base/AsyncCall.h'
--- src/base/AsyncCall.h	2010-12-02 23:33:27 +0000
+++ src/base/AsyncCall.h	2011-01-09 04:58:01 +0000
@@ -45,6 +45,8 @@
     friend class AsyncCallQueue;
 
     AsyncCall(int aDebugSection, int aDebugLevel, const char *aName);
+    AsyncCall();
+    AsyncCall(const AsyncCall &);
     virtual ~AsyncCall();
 
     void make(); // fire if we can; handles general call debugging
@@ -122,6 +124,10 @@
                const Dialer &aDialer): AsyncCall(aDebugSection, aDebugLevel, aName),
             dialer(aDialer) {}
 
+    AsyncCallT(const RefCount<AsyncCallT<Dialer> > &o):
+            AsyncCall(o->debugSection, o->debugLevel, o->name),
+            dialer(o->dialer) {}
+
     CallDialer *getDialer() { return &dialer; }
 
 protected:

=== modified file 'src/client_side.cc'
--- src/client_side.cc	2010-12-16 01:15:12 +0000
+++ src/client_side.cc	2011-01-10 00:46:31 +0000
@@ -92,8 +92,9 @@
 #include "ClientRequestContext.h"
 #include "clientStream.h"
 #include "comm.h"
+#include "CommCalls.h"
 #include "comm/Write.h"
-#include "comm/ListenStateData.h"
+#include "comm/TcpAcceptor.h"
 #include "base/TextException.h"
 #include "ConnectionDetail.h"
 #include "eui/Config.h"
@@ -135,13 +136,13 @@
 #define comm_close comm_lingering_close
 #endif
 
-/// dials clientHttpConnectionOpened or clientHttpsConnectionOpened call
+/// dials clientListenerConnectionOpened call
 class ListeningStartedDialer: public CallDialer, public Ipc::StartListeningCb
 {
 public:
-    typedef void (*Handler)(int fd, int errNo, http_port_list *portCfg);
-    ListeningStartedDialer(Handler aHandler, http_port_list *aPortCfg):
-            handler(aHandler), portCfg(aPortCfg) {}
+    typedef void (*Handler)(int fd, int errNo, http_port_list *portCfg, bool uses_ssl);
+    ListeningStartedDialer(Handler aHandler, http_port_list *aPortCfg, bool aSslFlag):
+            handler(aHandler), portCfg(aPortCfg), uses_ssl(aSslFlag) {}
 
     virtual void print(std::ostream &os) const {
         startPrint(os) <<
@@ -149,20 +150,17 @@
     }
 
     virtual bool canDial(AsyncCall &) const { return true; }
-    virtual void dial(AsyncCall &) { (handler)(fd, errNo, portCfg); }
+    virtual void dial(AsyncCall &) { (handler)(fd, errNo, portCfg, uses_ssl); }
 
 public:
     Handler handler;
 
 private:
     http_port_list *portCfg; ///< from Config.Sockaddr.http
+    bool uses_ssl;
 };
 
-
-static void clientHttpConnectionOpened(int fd, int errNo, http_port_list *s);
-#if USE_SSL
-static void clientHttpsConnectionOpened(int fd, int errNo, http_port_list *s);
-#endif
+static void clientListenerConnectionOpened(int fd, int errNo, http_port_list *s, bool uses_ssl);
 
 /* our socket-related context */
 
@@ -3122,14 +3120,15 @@
 
 /** Handle a new connection on HTTP socket. */
 void
-httpAccept(int sock, int newfd, ConnectionDetail *details,
-           comm_err_t flag, int xerrno, void *data)
+httpAccept(int, int newfd, ConnectionDetail *details, comm_err_t flag, int xerrno, void *data)
 {
     http_port_list *s = (http_port_list *)data;
     ConnStateData *connState = NULL;
 
     if (flag != COMM_OK) {
-        debugs(33, 1, "httpAccept: FD " << sock << ": accept failure: " << xstrerr(xerrno));
+        // This should not occur with TcpAcceptor.
+        // However its possible the call was still queued when the client disconnected
+        debugs(33, 1, "httpAccept: FD " << s->listenFd << ": accept failure: " << xstrerr(xerrno));
         return;
     }
 
@@ -3368,15 +3367,16 @@
 
 /** handle a new HTTPS connection */
 static void
-httpsAccept(int sock, int newfd, ConnectionDetail *details,
-            comm_err_t flag, int xerrno, void *data)
+httpsAccept(int, int newfd, ConnectionDetail *details, comm_err_t flag, int xerrno, void *data)
 {
     https_port_list *s = (https_port_list *)data;
     SSL_CTX *sslContext = s->staticSslContext.get();
 
     if (flag != COMM_OK) {
+        // This should not occur with TcpAcceptor.
+        // However its possible the call was still queued when the client disconnected
         errno = xerrno;
-        debugs(33, 1, "httpsAccept: FD " << sock << ": accept failure: " << xstrerr(xerrno));
+        debugs(33, 1, "httpsAccept: FD " << s->listenFd << ": accept failure: " << xstrerr(xerrno));
         return;
     }
 
@@ -3625,13 +3625,16 @@
         const int openFlags = COMM_NONBLOCKING |
                               (s->spoof_client_ip ? COMM_TRANSPARENT : 0);
 
-        AsyncCall::Pointer callback = asyncCall(33,2,
-                                                "clientHttpConnectionOpened",
-                                                ListeningStartedDialer(&clientHttpConnectionOpened, s));
-        Ipc::StartListening(SOCK_STREAM, IPPROTO_TCP, s->s, openFlags,
-                            Ipc::fdnHttpSocket, callback);
-
-        HttpSockets[NHttpSockets++] = -1; // set in clientHttpConnectionOpened
+        // setup the subscriptions such that new connections accepted by listenConn are handled by HTTP
+        typedef CommCbFunPtrCallT<CommAcceptCbPtrFun> AcceptCall;
+        RefCount<AcceptCall> subCall = commCbCall(5, 5, "httpAccept", CommAcceptCbPtrFun(httpAccept, s));
+        Subscription::Pointer sub = new CallSubscription<AcceptCall>(subCall);
+
+        AsyncCall::Pointer listenCall = asyncCall(33,2, "clientListenerConnectionOpened",
+                                                  ListeningStartedDialer(&clientListenerConnectionOpened, s, false));
+        Ipc::StartListening(SOCK_STREAM, IPPROTO_TCP, s->s, openFlags, Ipc::fdnHttpSocket, listenCall, sub);
+
+        HttpSockets[NHttpSockets++] = -1; // set in clientListenerConnectionOpened
     }
 
 #if USE_SSL
@@ -3644,27 +3647,24 @@
 
 /// process clientHttpConnectionsOpen result
 static void
-clientHttpConnectionOpened(int fd, int, http_port_list *s)
+clientListenerConnectionOpened(int fd, int errNo, http_port_list *s, bool uses_ssl)
 {
-    if (!OpenedHttpSocket(fd, "Cannot open HTTP Port"))
+    s->listenFd = fd;
+    if (!OpenedHttpSocket(s->listenFd, (uses_ssl?"Cannot open HTTPS Port":"Cannot open HTTP Port")))
         return;
 
     Must(s);
-
-    AsyncCall::Pointer call = commCbCall(5,5, "SomeCommAcceptHandler(httpAccept)",
-                                         CommAcceptCbPtrFun(httpAccept, s));
-
-    s->listener = new Comm::ListenStateData(fd, call, true);
-
-    debugs(1, 1, "Accepting " <<
+    Must(s->listenFd >= 0);
+
+    debugs(1, 1, "Accepting" <<
            (s->intercepted ? " intercepted" : "") <<
            (s->spoof_client_ip ? " spoofing" : "") <<
            (s->sslBump ? " bumpy" : "") <<
            (s->accel ? " accelerated" : "")
-           << " HTTP connections at " << s->s
-           << ", FD " << fd << "." );
+           << " HTTP" << (uses_ssl?"S":"") << " connections at "
+           << " FD " << s->listenFd << " on " << s->s);
 
-    Must(AddOpenedHttpSocket(fd)); // otherwise, we have received a fd we did not ask for
+    Must(AddOpenedHttpSocket(s->listenFd)); // otherwise, we have received a fd we did not ask for
 }
 
 #if USE_SSL
@@ -3686,35 +3686,22 @@
             continue;
         }
 
-        AsyncCall::Pointer call = asyncCall(33, 2, "clientHttpsConnectionOpened",
-                                            ListeningStartedDialer(&clientHttpsConnectionOpened, &s->http));
-
-        Ipc::StartListening(SOCK_STREAM, IPPROTO_TCP, s->http.s, COMM_NONBLOCKING,
-                            Ipc::fdnHttpsSocket, call);
+        const int openFlags = COMM_NONBLOCKING |
+                              (s->spoof_client_ip ? COMM_TRANSPARENT : 0);
+
+        // setup the subscriptions such that new connections accepted by listenConn are handled by HTTPS
+        typedef CommCbFunPtrCallT<CommAcceptCbPtrFun> AcceptCall;
+        RefCount<AcceptCall> subCall = commCbCall(5, 5, "httpsAccept", CommAcceptCbPtrFun(httpsAccept, s));
+        Subscription::Pointer sub = new CallSubscription<AcceptCall>(subCall);
+
+        AsyncCall::Pointer listenCall = asyncCall(33, 2, "clientListenerConnectionOpened",
+                                                  ListeningStartedDialer(&clientListenerConnectionOpened, &s->http, true));
+
+        Ipc::StartListening(SOCK_STREAM, IPPROTO_TCP, s->s, openFlags, Ipc::fdnHttpsSocket, listenCall, sub);
 
         HttpSockets[NHttpSockets++] = -1;
     }
 }
-
-/// process clientHttpsConnectionsOpen result
-static void
-clientHttpsConnectionOpened(int fd, int, http_port_list *s)
-{
-    if (!OpenedHttpSocket(fd, "Cannot open HTTPS Port"))
-        return;
-
-    Must(s);
-
-    AsyncCall::Pointer call = commCbCall(5,5, "SomeCommAcceptHandler(httpsAccept)",
-                                         CommAcceptCbPtrFun(httpsAccept, s));
-
-    s->listener = new Comm::ListenStateData(fd, call, true);
-
-    debugs(1, 1, "Accepting HTTPS connections at " << s->s << ", FD " << fd << ".");
-
-    Must(AddOpenedHttpSocket(fd)); // otherwise, we have received a fd we did not ask for
-}
-
 #endif
 
 void
@@ -3733,19 +3720,19 @@
 clientHttpConnectionsClose(void)
 {
     for (http_port_list *s = Config.Sockaddr.http; s; s = s->next) {
-        if (s->listener) {
-            debugs(1, 1, "FD " << s->listener->fd << " Closing HTTP connection");
-            delete s->listener;
-            s->listener = NULL;
+        if (s->listenFd >= 0) {
+            debugs(1, 1, "FD " << s->listenFd << " Closing HTTP connection");
+            comm_close(s->listenFd);
+            s->listenFd = -1;
         }
     }
 
 #if USE_SSL
     for (http_port_list *s = Config.Sockaddr.https; s; s = s->next) {
-        if (s->listener) {
-            debugs(1, 1, "FD " << s->listener->fd << " Closing HTTPS connection");
-            delete s->listener;
-            s->listener = NULL;
+        if (s->listenFd >= 0) {
+            debugs(1, 1, "FD " << s->listenFd << " Closing HTTPS connection");
+            comm_close(s->listenFd);
+            s->listenFd = -1;
         }
     }
 #endif

=== modified file 'src/comm.cc'
--- src/comm.cc	2010-12-06 20:06:08 +0000
+++ src/comm.cc	2011-01-09 10:11:24 +0000
@@ -41,7 +41,7 @@
 #include "comm/comm_internal.h"
 #include "comm/IoCallback.h"
 #include "comm/Write.h"
-#include "comm/ListenStateData.h"
+#include "comm/TcpAcceptor.h"
 #include "CommIO.h"
 #include "CommRead.h"
 #include "ConnectionDetail.h"
@@ -143,7 +143,7 @@
 bool
 isOpen(const int fd)
 {
-    return fd_table[fd].flags.open != 0;
+    return fd >= 0 && fd_table[fd].flags.open != 0;
 }
 
 /**

=== modified file 'src/comm/AcceptLimiter.cc'
--- src/comm/AcceptLimiter.cc	2009-12-31 02:35:01 +0000
+++ src/comm/AcceptLimiter.cc	2011-01-10 00:52:51 +0000
@@ -1,6 +1,6 @@
 #include "config.h"
 #include "comm/AcceptLimiter.h"
-#include "comm/ListenStateData.h"
+#include "comm/TcpAcceptor.h"
 #include "fde.h"
 
 Comm::AcceptLimiter Comm::AcceptLimiter::Instance_;
@@ -11,7 +11,7 @@
 }
 
 void
-Comm::AcceptLimiter::defer(Comm::ListenStateData *afd)
+Comm::AcceptLimiter::defer(Comm::TcpAcceptor *afd)
 {
     afd->isLimited++;
     debugs(5, 5, HERE << "FD " << afd->fd << " x" << afd->isLimited);
@@ -19,14 +19,33 @@
 }
 
 void
+Comm::AcceptLimiter::removeDead(const Comm::TcpAcceptor *afd)
+{
+    for (unsigned int i = 0; i < deferred.size() && afd->isLimited > 0; i++) {
+        if (deferred[i] == afd) {
+            deferred[i]->isLimited--;
+            deferred[i] = NULL; // fast. kick() will skip empty entries later.
+            debugs(5, 5, HERE << "FD " << afd->fd << " x" << afd->isLimited);
+        }
+    }
+}
+
+void
 Comm::AcceptLimiter::kick()
 {
+    // TODO: this could be optimized further with an iterator to search
+    //       looking for first non-NULL, followed by dumping the first N
+    //       with only one shift()/pop_front operation
+
     debugs(5, 5, HERE << " size=" << deferred.size());
-    if (deferred.size() > 0 && fdNFree() >= RESERVED_FD) {
-        debugs(5, 5, HERE << " doing one.");
+    while (deferred.size() > 0 && fdNFree() >= RESERVED_FD) {
         /* NP: shift() is equivalent to pop_front(). Giving us a FIFO queue. */
-        ListenStateData *temp = deferred.shift();
-        temp->isLimited--;
-        temp->acceptNext();
+        TcpAcceptor *temp = deferred.shift();
+        if (temp != NULL) {
+            debugs(5, 5, HERE << " doing one.");
+            temp->isLimited--;
+            temp->acceptNext();
+            break;
+        }
     }
 }

=== modified file 'src/comm/AcceptLimiter.h'
--- src/comm/AcceptLimiter.h	2010-01-13 01:13:17 +0000
+++ src/comm/AcceptLimiter.h	2011-01-08 14:09:20 +0000
@@ -6,7 +6,7 @@
 namespace Comm
 {
 
-class ListenStateData;
+class TcpAcceptor;
 
 /**
  * FIFO Queue holding listener socket handlers which have been activated
@@ -25,7 +25,10 @@
     static AcceptLimiter &Instance();
 
     /** delay accepting a new client connection. */
-    void defer(Comm::ListenStateData *afd);
+    void defer(Comm::TcpAcceptor *afd);
+
+    /** remove all records of an acceptor. Only to be called by the ConnAcceptor::swanSong() */
+    void removeDead(const Comm::TcpAcceptor *afd);
 
     /** try to accept and begin processing any delayed client connections. */
     void kick();
@@ -34,7 +37,7 @@
     static AcceptLimiter Instance_;
 
     /** FIFO queue */
-    Vector<Comm::ListenStateData*> deferred;
+    Vector<Comm::TcpAcceptor*> deferred;
 };
 
 }; // namepace Comm

=== modified file 'src/comm/Makefile.am'
--- src/comm/Makefile.am	2010-11-27 01:46:22 +0000
+++ src/comm/Makefile.am	2011-01-08 11:40:46 +0000
@@ -7,8 +7,8 @@
 libcomm_la_SOURCES= \
 	AcceptLimiter.cc \
 	AcceptLimiter.h \
-	ListenStateData.cc \
-	ListenStateData.h \
+	TcpAcceptor.cc \
+	TcpAcceptor.h \
 	\
 	IoCallback.cc \
 	IoCallback.h \

=== renamed file 'src/comm/ListenStateData.cc' => 'src/comm/TcpAcceptor.cc'
--- src/comm/ListenStateData.cc	2010-08-10 03:11:19 +0000
+++ src/comm/TcpAcceptor.cc	2011-01-09 09:51:14 +0000
@@ -33,15 +33,98 @@
  */
 
 #include "squid.h"
+#include "base/TextException.h"
 #include "CommCalls.h"
 #include "comm/AcceptLimiter.h"
 #include "comm/comm_internal.h"
-#include "comm/ListenStateData.h"
-#include "ConnectionDetail.h"
+#include "comm/TcpAcceptor.h"
 #include "fde.h"
 #include "protos.h"
 #include "SquidTime.h"
 
+namespace Comm {
+    CBDATA_CLASS_INIT(TcpAcceptor);
+};
+
+Comm::TcpAcceptor::TcpAcceptor(const int listenFd, const Ip::Address &laddr, int flags,
+                               const char *note, const Subscription::Pointer &aSub) :
+        AsyncJob("Comm::TcpAcceptor"),
+        errcode(0),
+        isLimited(0),
+        theCallSub(aSub),
+        fd(listenFd),
+        local_addr(laddr),
+        newFd_(-1)
+{
+    /* open the conn if its not already open */
+    if (fd < 0) {
+        fd = comm_open_listener(SOCK_STREAM, IPPROTO_TCP, local_addr, flags, note);
+        errcode = errno;
+
+        if (fd < 0) {
+            debugs(5, DBG_CRITICAL, HERE << "comm_open failed: FD " << fd << ", " << local_addr << " error: " << errcode);
+            return;
+        }
+        debugs(9, 3, HERE << "Unconnected data socket created on FD " << fd << ", " << local_addr);
+    }
+}
+
+void
+Comm::TcpAcceptor::subscribe(const Subscription::Pointer &aSub)
+{
+    debugs(5, 5, HERE << "FD " << fd << ", " << local_addr << " AsyncCall Subscription: " << aSub);
+    unsubscribe("subscription change");
+    theCallSub = aSub;
+}
+
+void
+Comm::TcpAcceptor::unsubscribe(const char *reason)
+{
+    debugs(5, 5, HERE << "FD " << fd << ", " << local_addr << " AsyncCall Subscription " << theCallSub << " removed: " << reason);
+    theCallSub = NULL;
+}
+
+void
+Comm::TcpAcceptor::start()
+{
+    debugs(5, 5, HERE << "FD " << fd << ", " << local_addr << " AsyncCall Subscription: " << theCallSub);
+
+    Must(isOpen(fd));
+
+    setListen();
+
+    // if no error so far start accepting connections.
+    if (errcode == 0)
+        commSetSelect(fd, COMM_SELECT_READ, doAccept, this, 0);
+}
+
+bool
+Comm::TcpAcceptor::doneAll() const
+{
+    // stop when FD is closed
+    if (!isOpen(fd)) {
+        return AsyncJob::doneAll();
+    }
+
+    // stop when handlers are gone
+    if (theCallSub == NULL) {
+        return AsyncJob::doneAll();
+    }
+
+    // open FD with handlers...keep accepting.
+    return false;
+}
+
+void
+Comm::TcpAcceptor::swanSong()
+{
+    debugs(5,5, HERE);
+    unsubscribe("swanSong");
+    fd = -1;
+    AcceptLimiter::Instance().removeDead(this);
+    AsyncJob::swanSong();
+}
+
 /**
  * New-style listen and accept routines
  *
@@ -50,11 +133,11 @@
  * accept()ed some time later.
  */
 void
-Comm::ListenStateData::setListen()
+Comm::TcpAcceptor::setListen()
 {
     errcode = 0; // reset local errno copy.
     if (listen(fd, Squid_MaxFD >> 2) < 0) {
-        debugs(50, 0, HERE << "listen(FD " << fd << ", " << (Squid_MaxFD >> 2) << "): " << xstrerror());
+        debugs(50, DBG_CRITICAL, "ERROR: listen(FD " << fd << ", " << local_addr << ", " << (Squid_MaxFD >> 2) << "): " << xstrerror());
         errcode = errno;
         return;
     }
@@ -66,37 +149,19 @@
         debugs(5, DBG_IMPORTANT, "Installing accept filter '" << Config.accept_filter << "' on FD " << fd);
         xstrncpy(afa.af_name, Config.accept_filter, sizeof(afa.af_name));
         if (setsockopt(fd, SOL_SOCKET, SO_ACCEPTFILTER, &afa, sizeof(afa)) < 0)
-            debugs(5, DBG_CRITICAL, "SO_ACCEPTFILTER '" << Config.accept_filter << "': '" << xstrerror());
+            debugs(5, DBG_CRITICAL, "WARNING: SO_ACCEPTFILTER '" << Config.accept_filter << "': '" << xstrerror());
 #elif defined(TCP_DEFER_ACCEPT)
         int seconds = 30;
         if (strncmp(Config.accept_filter, "data=", 5) == 0)
             seconds = atoi(Config.accept_filter + 5);
         if (setsockopt(fd, IPPROTO_TCP, TCP_DEFER_ACCEPT, &seconds, sizeof(seconds)) < 0)
-            debugs(5, DBG_CRITICAL, "TCP_DEFER_ACCEPT '" << Config.accept_filter << "': '" << xstrerror());
+            debugs(5, DBG_CRITICAL, "WARNING: TCP_DEFER_ACCEPT '" << Config.accept_filter << "': '" << xstrerror());
 #else
-        debugs(5, DBG_CRITICAL, "accept_filter not supported on your OS");
+        debugs(5, DBG_CRITICAL, "WARNING: accept_filter not supported on your OS");
 #endif
     }
 }
 
-Comm::ListenStateData::ListenStateData(int aFd, AsyncCall::Pointer &call, bool accept_many) :
-        fd(aFd),
-        theCallback(call),
-        mayAcceptMore(accept_many)
-{
-    assert(aFd >= 0);
-    debugs(5, 5, HERE << "FD " << fd << " AsyncCall: " << call);
-    assert(isOpen(aFd));
-    setListen();
-    commSetSelect(fd, COMM_SELECT_READ, doAccept, this, 0);
-}
-
-Comm::ListenStateData::~ListenStateData()
-{
-    comm_close(fd);
-    fd = -1;
-}
-
 /**
  * This private callback is called whenever a filedescriptor is ready
  * to dupe itself and fob off an accept()ed connection
@@ -107,23 +172,30 @@
  * done later when enough sockets become available.
  */
 void
-Comm::ListenStateData::doAccept(int fd, void *data)
+Comm::TcpAcceptor::doAccept(int fd, void *data)
 {
-    debugs(5, 2, HERE << "New connection on FD " << fd);
-
-    assert(isOpen(fd));
-    ListenStateData *afd = static_cast<ListenStateData*>(data);
-
-    if (!okToAccept()) {
-        AcceptLimiter::Instance().defer(afd);
-    } else {
-        afd->acceptNext();
+    try {
+        debugs(5, 2, HERE << "New connection on FD " << fd);
+
+        Must(isOpen(fd));
+        TcpAcceptor *afd = static_cast<TcpAcceptor*>(data);
+
+        if (!okToAccept()) {
+            AcceptLimiter::Instance().defer(afd);
+        } else {
+            afd->acceptNext();
+        }
+        commSetSelect(fd, COMM_SELECT_READ, Comm::TcpAcceptor::doAccept, afd, 0);
+
+    } catch(const TextException &e) {
+        fatalf("FATAL: error while accepting new client connection: %s\n", e.message);
+    } catch(...) {
+        fatal("FATAL: error while accepting new client connection: [unkown]\n");
     }
-    commSetSelect(fd, COMM_SELECT_READ, Comm::ListenStateData::doAccept, afd, 0);
 }
 
 bool
-Comm::ListenStateData::okToAccept()
+Comm::TcpAcceptor::okToAccept()
 {
     static time_t last_warn = 0;
 
@@ -139,7 +211,7 @@
 }
 
 void
-Comm::ListenStateData::acceptOne()
+Comm::TcpAcceptor::acceptOne()
 {
     /*
      * We don't worry about running low on FDs here.  Instead,
@@ -148,42 +220,44 @@
      */
 
     /* Accept a new connection */
-    ConnectionDetail connDetails;
-    int newfd = oldAccept(connDetails);
+    ConnectionDetail newConnDetails;
+    comm_err_t status = oldAccept(newConnDetails);
 
     /* Check for errors */
-    if (newfd < 0) {
+    if (!isOpen(newFd_)) {
 
-        if (newfd == COMM_NOMESSAGE) {
+        if (status == COMM_NOMESSAGE) {
             /* register interest again */
-            debugs(5, 5, HERE << "try later: FD " << fd << " handler: " << theCallback);
+            debugs(5, 5, HERE << "try later: FD " << fd << " handler Subscription: " << theCallSub);
             commSetSelect(fd, COMM_SELECT_READ, doAccept, this, 0);
             return;
         }
 
         // A non-recoverable error; notify the caller */
-        debugs(5, 5, HERE << "non-recoverable error: FD " << fd << " handler: " << theCallback);
-        notify(-1, COMM_ERROR, connDetails);
-        mayAcceptMore = false;
+        debugs(5, 5, HERE << "non-recoverable error: FD " << fd << ", " << local_addr << " handler Subscription: " << theCallSub);
+        notify(status, newConnDetails);
+        mustStop("Listener socket closed");
         return;
     }
 
-    debugs(5, 5, HERE << "accepted: FD " << fd <<
-           " newfd: " << newfd << " from: " << connDetails.peer <<
-           " handler: " << theCallback);
-    notify(newfd, COMM_OK, connDetails);
+    debugs(5, 5, HERE << "Listener: FD " << fd <<
+           " accepted new connection from " << newConnDetails.peer <<
+           " handler Subscription: " << theCallSub);
+    notify(status, newConnDetails);
 }
 
 void
-Comm::ListenStateData::acceptNext()
+Comm::TcpAcceptor::acceptNext()
 {
-    assert(isOpen(fd));
+    Must(isOpen(fd));
     debugs(5, 2, HERE << "connection on FD " << fd);
     acceptOne();
 }
 
+// XXX: obsolete comment?
+// NP: can't be a const function because syncWithComm() side effects hit theCallSub->callback().
 void
-Comm::ListenStateData::notify(int newfd, comm_err_t flag, const ConnectionDetail &connDetails)
+Comm::TcpAcceptor::notify(comm_err_t flag, const ConnectionDetail &connDetails)
 {
     // listener socket handlers just abandon the port with COMM_ERR_CLOSING
     // it should only happen when this object is deleted...
@@ -191,26 +265,33 @@
         return;
     }
 
-    if (theCallback != NULL) {
-        typedef CommAcceptCbParams Params;
-        Params &params = GetCommParams<Params>(theCallback);
+    if (theCallSub != NULL) {
+        AsyncCall::Pointer call = theCallSub->callback();
+        CommAcceptCbParams &params = GetCommParams<CommAcceptCbParams>(call);
         params.fd = fd;
-        params.nfd = newfd;
+        params.nfd = newFd_;
         params.details = connDetails;
         params.flag = flag;
         params.xerrno = errcode;
-        ScheduleCallHere(theCallback);
-        if (!mayAcceptMore)
-            theCallback = NULL;
+        ScheduleCallHere(call);
     }
+
+    // drop the temporary recent accepted socket FD details.
+    // this prevents information crossover on calls.
+    newFd_ = -1;
 }
 
 /**
  * accept() and process
- * Wait for an incoming connection on FD.
+ * Wait for an incoming connection on our listener socket.
+ *
+ * \retval COMM_OK         success. details parameter filled.
+ * \retval COMM_NOMESSAGE  attempted accept() but nothing useful came in.
+ * \retval COMM_ERROR      an outright failure occured.
+ *                         Or if this client has too many connections already.
  */
-int
-Comm::ListenStateData::oldAccept(ConnectionDetail &details)
+comm_err_t
+Comm::TcpAcceptor::oldAccept(ConnectionDetail &details)
 {
     PROF_start(comm_accept);
     statCounter.syscalls.sock.accepts++;
@@ -238,6 +319,8 @@
         }
     }
 
+    Must(sock >= 0);
+    newFd_ = sock;
     details.peer = *gai;
 
     if ( Config.client_ip_max_connections >= 0) {
@@ -248,15 +331,16 @@
         }
     }
 
+    // lookup the local-end details of this new connection
     details.me.InitAddrInfo(gai);
-
     details.me.SetEmpty();
     getsockname(sock, gai->ai_addr, &gai->ai_addrlen);
     details.me = *gai;
-
-    commSetCloseOnExec(sock);
+    details.me.FreeAddrInfo(gai);
 
     /* fdstat update */
+    // XXX : these are not all HTTP requests. use a note about type and ip:port details->
+    // so we end up with a uniform "(HTTP|FTP-data|HTTPS|...) remote-ip:remote-port"
     fd_open(sock, FD_SOCKET, "HTTP Request");
 
     fdd_table[sock].close_file = NULL;
@@ -265,15 +349,16 @@
     fde *F = &fd_table[sock];
     details.peer.NtoA(F->ipaddr,MAX_IPSTRLEN);
     F->remote_port = details.peer.GetPort();
-    F->local_addr.SetPort(details.me.GetPort());
+    F->local_addr = details.me;
     F->sock_family = details.me.IsIPv6()?AF_INET6:AF_INET;
-    details.me.FreeAddrInfo(gai);
 
+    // set socket flags
+    commSetCloseOnExec(sock);
     commSetNonBlocking(sock);
 
     /* IFF the socket is (tproxy) transparent, pass the flag down to allow spoofing */
     F->flags.transparent = fd_table[fd].flags.transparent;
 
     PROF_stop(comm_accept);
-    return sock;
+    return COMM_OK;
 }

=== renamed file 'src/comm/ListenStateData.h' => 'src/comm/TcpAcceptor.h'
--- src/comm/ListenStateData.h	2010-11-27 01:58:38 +0000
+++ src/comm/TcpAcceptor.h	2011-01-09 00:24:09 +0000
@@ -1,39 +1,89 @@
-#ifndef SQUID_LISTENERSTATEDATA_H
-#define SQUID_LISTENERSTATEDATA_H
+#ifndef SQUID_COMM_TCPACCEPTOR_H
+#define SQUID_COMM_TCPACCEPTOR_H
 
 #include "base/AsyncCall.h"
-#include "comm.h"
+#include "base/Subscription.h"
+#include "CommCalls.h"
+#include "comm_err_t.h"
+#include "comm/TcpAcceptor.h"
+#include "ip/Address.h"
+
 #if HAVE_MAP
 #include <map>
 #endif
 
-class ConnectionDetail;
-
 namespace Comm
 {
 
-class ListenStateData
+class AcceptLimiter;
+
+/**
+ * Listens on an FD for new incoming connections and
+ * emits an active FD descriptor for the new client.
+ *
+ * Handles all event limiting required to quash inbound connection
+ * floods within the global FD limits of available Squid_MaxFD and
+ * client_ip_max_connections.
+ *
+ * Fills the emitted connection with all connection details able to
+ * be looked up. Currently these are the local/remote IP:port details
+ * and the listening socket transparent-mode flag.
+ */
+class TcpAcceptor : public AsyncJob
 {
+private:
+    virtual void start();
+    virtual bool doneAll() const;
+    virtual void swanSong();
 
 public:
-    ListenStateData(int fd, AsyncCall::Pointer &call, bool accept_many);
-    ListenStateData(const ListenStateData &r); // not implemented.
-    ~ListenStateData();
-
-    void subscribe(AsyncCall::Pointer &call);
+    TcpAcceptor(const int listenFd, const Ip::Address &laddr, int flags,
+                const char *note, const Subscription::Pointer &aSub);
+
+    TcpAcceptor(const TcpAcceptor &r); // not implemented.
+
+    /** Subscribe a handler to receive calls back about new connections.
+     * Replaces any existing subscribed handler.
+     */
+    void subscribe(const Subscription::Pointer &aSub);
+
+    /** Remove the currently waiting callback subscription.
+     * Pending calls will remain scheduled.
+     */
+    void unsubscribe(const char *reason);
+
+    /** Try and accept another connection (synchronous).
+     * If one is pending already the subscribed callback handler will be scheduled
+     * to handle it before this method returns.
+     */
     void acceptNext();
-    void notify(int newfd, comm_err_t flag, const ConnectionDetail &details);
 
-    int fd;
+    /// Call the subscribed callback handler with details about a new connection.
+    void notify(comm_err_t flags, const ConnectionDetail &newConnDetails);
 
     /// errno code of the last accept() or listen() action if one occurred.
     int errcode;
 
-    /// whether this socket is delayed and on the AcceptLimiter queue.
-    int32_t isLimited;
-
-private:
-    /// Method to test if there are enough file escriptors to open a new client connection
+private:
+    friend class AcceptLimiter;
+    int32_t isLimited;                   ///< whether this socket is delayed and on the AcceptLimiter queue.
+    Subscription::Pointer theCallSub;    ///< used to generate AsyncCalls handling our events.
+
+public:
+    /// conn being listened on for new connections
+    /// Reserved for read-only use.
+    // NP: public only until we can hide it behind connection handles
+    int fd;
+
+private:
+    /// IP Address and port being listened on
+    Ip::Address local_addr;
+
+    /// temporary holder for newely accepted client FD
+    int newFd_;
+
+private:
+    /// Method to test if there are enough file descriptors to open a new client connection
     /// if not the accept() will be postponed
     static bool okToAccept();
 
@@ -41,14 +91,12 @@
     static void doAccept(int fd, void *data);
 
     void acceptOne();
-    int oldAccept(ConnectionDetail &details);
-
-    AsyncCall::Pointer theCallback;
-    bool mayAcceptMore;
-
+    comm_err_t oldAccept(ConnectionDetail &newConnDetails);
     void setListen();
+
+    CBDATA_CLASS2(TcpAcceptor);
 };
 
 } // namespace Comm
 
-#endif /* SQUID_LISTENERSTATEDATA_H */
+#endif /* SQUID_COMM_TCPACCEPTOR_H */

=== modified file 'src/ftp.cc'
--- src/ftp.cc	2010-12-13 11:31:14 +0000
+++ src/ftp.cc	2011-01-10 00:42:23 +0000
@@ -34,8 +34,9 @@
 
 #include "squid.h"
 #include "comm.h"
+#include "CommCalls.h"
+#include "comm/TcpAcceptor.h"
 #include "comm/Write.h"
-#include "comm/ListenStateData.h"
 #include "compat/strtoll.h"
 #include "ConnectionDetail.h"
 #include "errorpage.h"
@@ -153,13 +154,11 @@
 
     void clear(); /// just resets fd and close handler. does not close active connections.
 
-    int fd; /// channel descriptor; \todo: remove because the closer has it
-
-    /** Current listening socket handler. delete on shutdown or abort.
-     * FTP stores a copy of the FD in the field fd above.
-     * Use close() to properly close the channel.
-     */
-    Comm::ListenStateData *listener;
+    int fd; /// channel descriptor
+
+    Ip::Address local; ///< The local IP address:port this channel is using
+
+    int flags; ///< socket flags used when opening.
 
 private:
     AsyncCall::Pointer closer; /// Comm close handler callback
@@ -245,6 +244,12 @@
     void completedListing(void);
     void dataComplete();
     void dataRead(const CommIoCbParams &io);
+
+    /// ignore timeout on CTRL channel. set read timeout on DATA channel.
+    void switchTimeoutToDataChannel();
+    /// create a data channel acceptor and start listening.
+    void listenForDataChannel(const int fd, const char *note);
+
     int checkAuth(const HttpHeader * req_hdr);
     void checkUrlpath();
     void buildTitleUrl();
@@ -451,10 +456,10 @@
 void
 FtpStateData::dataClosed(const CommCloseCbParams &io)
 {
-    if (data.listener) {
-        delete data.listener;
-        data.listener = NULL;
-        data.fd = -1;
+    debugs(9, 4, HERE);
+    if (data.fd >= 0) {
+        comm_close(data.fd);
+        // NP clear() does the: data.fd = -1;
     }
     data.clear();
     failed(ERR_FTP_FAILURE, 0);
@@ -606,6 +611,28 @@
 }
 
 void
+FtpStateData::switchTimeoutToDataChannel()
+{
+    commSetTimeout(ctrl.fd, -1, NULL, NULL);
+
+    typedef CommCbMemFunT<FtpStateData, CommTimeoutCbParams> TimeoutDialer;
+    AsyncCall::Pointer timeoutCall = JobCallback(9, 5, TimeoutDialer, this, FtpStateData::ftpTimeout);
+    commSetTimeout(data.fd, Config.Timeout.read, timeoutCall);
+}
+
+void
+FtpStateData::listenForDataChannel(const int fd, const char *note)
+{
+    typedef CommCbMemFunT<FtpStateData, CommAcceptCbParams> AcceptDialer;
+    typedef AsyncCallT<AcceptDialer> AcceptCall;
+    RefCount<AcceptCall> call = static_cast<AcceptCall*>(JobCallback(11, 5, AcceptDialer, this, FtpStateData::ftpAcceptDataConnection));
+    Subscription::Pointer sub = new CallSubscription<AcceptCall>(call);
+    Comm::TcpAcceptor *tmp = new Comm::TcpAcceptor(fd, data.local, data.flags, note, sub);
+    data.fd = tmp->fd; // Ensure we have a copy of the FD opened for listening.
+    AsyncJob::Start(tmp);
+}
+
+void
 FtpStateData::ftpTimeout(const CommTimeoutCbParams &io)
 {
     debugs(9, 4, "ftpTimeout: FD " << io.fd << ": '" << entry->url() << "'" );
@@ -1066,10 +1093,16 @@
 
     usable = end - sbuf;
 
-    debugs(9, 3, HERE << "usable = " << usable);
+    debugs(9, 3, HERE << "usable = " << usable << " of " << len << " bytes.");
 
     if (usable == 0) {
-        debugs(9, 3, HERE << "didn't find end for " << entry->url()  );
+        if (buf[0] == '\0' && len == 1) {
+            debugs(9, 3, HERE << "NIL ends data from " << entry->url() << " transfer problem?");
+            data.readBuf->consume(len);
+        } else {
+            debugs(9, 3, HERE << "didn't find end for " << entry->url());
+            debugs(9, 3, HERE << "buffer remains (" << len << " bytes) '" << buf << "'");
+        }
         xfree(sbuf);
         return;
     }
@@ -1673,7 +1706,7 @@
          * establish one on the control socket.
          */
 
-        if (data.fd > -1) {
+        if (data.fd >= 0) {
             AsyncCall::Pointer nullCall =  NULL;
             commSetTimeout(data.fd, -1, nullCall);
         }
@@ -2718,27 +2751,21 @@
 static int
 ftpOpenListenSocket(FtpStateData * ftpState, int fallback)
 {
-    int fd;
-    Ip::Address addr;
     struct addrinfo *AI = NULL;
-    int on = 1;
     int x = 0;
 
     /// Close old data channels, if any. We may open a new one below.
-    ftpState->data.close();
+    if (!(ftpState->data.flags & COMM_REUSEADDR))
+        ftpState->data.close();
 
     /*
      * Set up a listen socket on the same local address as the
      * control connection.
      */
-
-    addr.InitAddrInfo(AI);
-
+    ftpState->data.local.InitAddrInfo(AI);
     x = getsockname(ftpState->ctrl.fd, AI->ai_addr, &AI->ai_addrlen);
-
-    addr = *AI;
-
-    addr.FreeAddrInfo(AI);
+    ftpState->data.local = *AI;
+    ftpState->data.local.FreeAddrInfo(AI);
 
     if (x) {
         debugs(9, DBG_CRITICAL, HERE << "getsockname(" << ftpState->ctrl.fd << ",..): " << xstrerror());
@@ -2750,38 +2777,19 @@
      * used for both control and data.
      */
     if (fallback) {
+        int on = 1;
         setsockopt(ftpState->ctrl.fd, SOL_SOCKET, SO_REUSEADDR, (char *) &on, sizeof(on));
+        ftpState->ctrl.flags |= COMM_REUSEADDR;
+        ftpState->data.flags |= COMM_REUSEADDR;
+        ftpState->data.fd = ftpState->ctrl.fd;
     } else {
         /* if not running in fallback mode a new port needs to be retrieved */
-        addr.SetPort(0);
-    }
-
-    fd = comm_open(SOCK_STREAM,
-                   IPPROTO_TCP,
-                   addr,
-                   COMM_NONBLOCKING | (fallback ? COMM_REUSEADDR : 0),
-                   ftpState->entry->url());
-    debugs(9, 3, HERE << "Unconnected data socket created on FD " << fd  );
-
-    if (fd < 0) {
-        debugs(9, DBG_CRITICAL, HERE << "comm_open failed");
-        return -1;
-    }
-
-    typedef CommCbMemFunT<FtpStateData, CommAcceptCbParams> acceptDialer;
-    AsyncCall::Pointer acceptCall = JobCallback(11, 5,
-                                    acceptDialer, ftpState, FtpStateData::ftpAcceptDataConnection);
-    ftpState->data.listener = new Comm::ListenStateData(fd, acceptCall, false);
-
-    if (!ftpState->data.listener || ftpState->data.listener->errcode != 0) {
-        comm_close(fd);
-        return -1;
-    }
-
-    ftpState->data.opened(fd, ftpState->dataCloser());
-    ftpState->data.port = comm_local_port(fd);
-    ftpState->data.host = NULL;
-    return fd;
+        ftpState->data.local.SetPort(0);
+        ftpState->data.flags = COMM_NONBLOCKING;
+    }
+
+    ftpState->listenForDataChannel(ftpState->data.fd, ftpState->entry->url());
+    return ftpState->data.fd;
 }
 
 /// \ingroup ServerProtocolFTPInternal
@@ -2870,6 +2878,7 @@
     debugs(9, 3, HERE);
     ftpState->flags.pasv_supported = 0;
     fd = ftpOpenListenSocket(ftpState, 0);
+    debugs(9, 3, "Listening for FTP data connection with FD " << fd);
 
     Ip::Address::InitAddrInfo(AI);
 
@@ -2922,77 +2931,68 @@
  */
 void FtpStateData::ftpAcceptDataConnection(const CommAcceptCbParams &io)
 {
-    char ntoapeer[MAX_IPSTRLEN];
-    debugs(9, 3, "ftpAcceptDataConnection");
-
-    // one connection accepted. the handler has stopped listening. drop our local pointer to it.
-    data.listener = NULL;
+    debugs(9, 3, HERE);
 
     if (EBIT_TEST(entry->flags, ENTRY_ABORTED)) {
         abortTransaction("entry aborted when accepting data conn");
         return;
     }
 
+    if (io.flag != COMM_OK) {
+        data.close();
+        debugs(9, DBG_IMPORTANT, "FTP AcceptDataConnection: FD " << io.fd << ": " << xstrerr(io.xerrno));
+        /** \todo Need to send error message on control channel*/
+        ftpFail(this);
+        return;
+    }
+
+    /* data listening conn is no longer even open. abort. */
+    if (data.fd <= 0 || fd_table[data.fd].flags.open == 0) {
+        data.clear(); // ensure that it's cleared and not just closed.
+        return;
+    }
+
     /** \par
      * When squid.conf ftp_sanitycheck is enabled, check the new connection is actually being
      * made by the remote client which is connected to the FTP control socket.
+     * Or the one which we were told to listen for by control channel messages (may differ under NAT).
      * This prevents third-party hacks, but also third-party load balancing handshakes.
      */
     if (Config.Ftp.sanitycheck) {
+        char ntoapeer[MAX_IPSTRLEN];
         io.details.peer.NtoA(ntoapeer,MAX_IPSTRLEN);
 
-        if (strcmp(fd_table[ctrl.fd].ipaddr, ntoapeer) != 0) {
+        if (strcmp(fd_table[ctrl.fd].ipaddr, ntoapeer) != 0 &&
+                strcmp(fd_table[data.fd].ipaddr, ntoapeer) != 0) {
             debugs(9, DBG_IMPORTANT,
                    "FTP data connection from unexpected server (" <<
                    io.details.peer << "), expecting " <<
-                   fd_table[ctrl.fd].ipaddr);
+                   fd_table[ctrl.fd].ipaddr << " or " << fd_table[data.fd].ipaddr);
 
-            /* close the bad soures connection down ASAP. */
+            /* close the bad sources connection down ASAP. */
             comm_close(io.nfd);
 
-            /* we are ony accepting once, so need to re-open the listener socket. */
-            typedef CommCbMemFunT<FtpStateData, CommAcceptCbParams> acceptDialer;
-            AsyncCall::Pointer acceptCall = JobCallback(11, 5,
-                                            acceptDialer, this, FtpStateData::ftpAcceptDataConnection);
-            data.listener = new Comm::ListenStateData(data.fd, acceptCall, false);
+            /* drop the bad connection (io) by ignoring the attempt. */
             return;
         }
     }
 
-    if (io.flag != COMM_OK) {
-        debugs(9, DBG_IMPORTANT, "ftpHandleDataAccept: FD " << io.nfd << ": " << xstrerr(io.xerrno));
-        /** \todo XXX Need to set error message */
-        ftpFail(this);
-        return;
-    }
-
     /**\par
-     * Replace the Listen socket with the accepted data socket */
+     * Replace the Listening socket with the accepted data socket */
     data.close();
     data.opened(io.nfd, dataCloser());
     data.port = io.details.peer.GetPort();
-    io.details.peer.NtoA(data.host,SQUIDHOSTNAMELEN);
+    data.host = xstrdup(fd_table[io.nfd].ipaddr);
 
     debugs(9, 3, "ftpAcceptDataConnection: Connected data socket on " <<
            "FD " << io.nfd << " to " << io.details.peer << " FD table says: " <<
            "ctrl-peer= " << fd_table[ctrl.fd].ipaddr << ", " <<
            "data-peer= " << fd_table[data.fd].ipaddr);
 
-
-    AsyncCall::Pointer nullCall = NULL;
-    commSetTimeout(ctrl.fd, -1, nullCall);
-
-    typedef CommCbMemFunT<FtpStateData, CommTimeoutCbParams> TimeoutDialer;
-    AsyncCall::Pointer timeoutCall =  JobCallback(9, 5,
-                                      TimeoutDialer, this, FtpStateData::ftpTimeout);
-    commSetTimeout(data.fd, Config.Timeout.read, timeoutCall);
-
-    /*\todo XXX We should have a flag to track connect state...
-     *    host NULL -> not connected, port == local port
-     *    host set  -> connected, port == remote port
-     */
-    /* Restart state (SENT_NLST/LIST/RETR) */
-    FTP_SM_FUNCS[state] (this);
+    assert(haveControlChannel("ftpAcceptDataConnection"));
+    assert(ctrl.message == NULL);
+
+    // Ctrl channel operations will determine what happens to this data connection
 }
 
 /// \ingroup ServerProtocolFTPInternal
@@ -3087,11 +3087,7 @@
         /*\par
          * When client code is 150 with a hostname, Accept data channel. */
         debugs(9, 3, "ftpReadStor: accepting data channel");
-        typedef CommCbMemFunT<FtpStateData, CommAcceptCbParams> acceptDialer;
-        AsyncCall::Pointer acceptCall = JobCallback(11, 5,
-                                        acceptDialer, this, FtpStateData::ftpAcceptDataConnection);
-
-        data.listener = new Comm::ListenStateData(data.fd, acceptCall, false);
+        listenForDataChannel(data.fd, data.host);
     } else {
         debugs(9, DBG_IMPORTANT, HERE << "Unexpected reply code "<< std::setfill('0') << std::setw(3) << code);
         ftpFail(this);
@@ -3211,34 +3207,16 @@
 
     if (code == 125 || (code == 150 && ftpState->data.host)) {
         /* Begin data transfer */
-        /* XXX what about Config.Timeout.read? */
+        debugs(9, 3, HERE << "begin data transfer from " << ftpState->data.host << " (" << ftpState->data.local << ")");
+        ftpState->switchTimeoutToDataChannel();
         ftpState->maybeReadVirginBody();
         ftpState->state = READING_DATA;
-        /*
-         * Cancel the timeout on the Control socket and establish one
-         * on the data socket
-         */
-        AsyncCall::Pointer nullCall = NULL;
-        commSetTimeout(ftpState->ctrl.fd, -1, nullCall);
         return;
     } else if (code == 150) {
         /* Accept data channel */
-        typedef CommCbMemFunT<FtpStateData, CommAcceptCbParams> acceptDialer;
-        AsyncCall::Pointer acceptCall = JobCallback(11, 5,
-                                        acceptDialer, ftpState, FtpStateData::ftpAcceptDataConnection);
-
-        ftpState->data.listener = new Comm::ListenStateData(ftpState->data.fd, acceptCall, false);
-        /*
-         * Cancel the timeout on the Control socket and establish one
-         * on the data socket
-         */
-        AsyncCall::Pointer nullCall = NULL;
-        commSetTimeout(ftpState->ctrl.fd, -1, nullCall);
-
-        typedef CommCbMemFunT<FtpStateData, CommTimeoutCbParams> TimeoutDialer;
-        AsyncCall::Pointer timeoutCall =  JobCallback(9, 5,
-                                          TimeoutDialer, ftpState,FtpStateData::ftpTimeout);
-        commSetTimeout(ftpState->data.fd, Config.Timeout.read, timeoutCall);
+        debugs(9, 3, HERE << "accept data channel from " << ftpState->data.host << " (" << ftpState->data.local << ")");
+        ftpState->switchTimeoutToDataChannel();
+        ftpState->listenForDataChannel(ftpState->data.fd, ftpState->data.host);
         return;
     } else if (!ftpState->flags.tried_nlst && code > 300) {
         ftpSendNlst(ftpState);
@@ -3274,32 +3252,13 @@
     if (code == 125 || (code == 150 && ftpState->data.host)) {
         /* Begin data transfer */
         debugs(9, 3, HERE << "reading data channel");
-        /* XXX what about Config.Timeout.read? */
+        ftpState->switchTimeoutToDataChannel();
         ftpState->maybeReadVirginBody();
         ftpState->state = READING_DATA;
-        /*
-         * Cancel the timeout on the Control socket and establish one
-         * on the data socket
-         */
-        AsyncCall::Pointer nullCall = NULL;
-        commSetTimeout(ftpState->ctrl.fd, -1, nullCall);
     } else if (code == 150) {
         /* Accept data channel */
-        typedef CommCbMemFunT<FtpStateData, CommAcceptCbParams> acceptDialer;
-        AsyncCall::Pointer acceptCall = JobCallback(11, 5,
-                                        acceptDialer, ftpState, FtpStateData::ftpAcceptDataConnection);
-        ftpState->data.listener = new Comm::ListenStateData(ftpState->data.fd, acceptCall, false);
-        /*
-         * Cancel the timeout on the Control socket and establish one
-         * on the data socket
-         */
-        AsyncCall::Pointer nullCall = NULL;
-        commSetTimeout(ftpState->ctrl.fd, -1, nullCall);
-
-        typedef CommCbMemFunT<FtpStateData, CommTimeoutCbParams> TimeoutDialer;
-        AsyncCall::Pointer timeoutCall =  JobCallback(9, 5,
-                                          TimeoutDialer, ftpState,FtpStateData::ftpTimeout);
-        commSetTimeout(ftpState->data.fd, Config.Timeout.read, timeoutCall);
+        ftpState->switchTimeoutToDataChannel();
+        ftpState->listenForDataChannel(ftpState->data.fd, ftpState->data.host);
     } else if (code >= 300) {
         if (!ftpState->flags.try_slash_hack) {
             /* Try this as a directory missing trailing slash... */
@@ -3952,6 +3911,13 @@
     fd = aFd;
     closer = aCloser;
     comm_add_close_handler(fd, closer);
+
+    // grab the local IP address:port details for this connection
+    struct addrinfo *AI = NULL;
+    local.InitAddrInfo(AI);
+    getsockname(aFd, AI->ai_addr, &AI->ai_addrlen);
+    local = *AI;
+    local.FreeAddrInfo(AI);
 }
 
 /// planned close: removes the close handler and calls comm_close
@@ -3959,15 +3925,11 @@
 FtpChannel::close()
 {
     // channels with active listeners will be closed when the listener handler dies.
-    if (listener) {
-        delete listener;
-        listener = NULL;
-        comm_remove_close_handler(fd, closer);
-        closer = NULL;
-        fd = -1;
-    } else if (fd >= 0) {
-        comm_remove_close_handler(fd, closer);
-        closer = NULL;
+    if (fd >= 0) {
+        if (closer != NULL) {
+            comm_remove_close_handler(fd, closer);
+            closer = NULL;
+        }
         comm_close(fd); // we do not expect to be called back
         fd = -1;
     }

=== modified file 'src/htcp.cc'
--- src/htcp.cc	2010-12-13 11:31:14 +0000
+++ src/htcp.cc	2011-01-09 08:28:49 +0000
@@ -1516,12 +1516,13 @@
     AsyncCall::Pointer call = asyncCall(31, 2,
                                         "htcpIncomingConnectionOpened",
                                         HtcpListeningStartedDialer(&htcpIncomingConnectionOpened));
+    Subscription::Pointer nilSub;
 
     Ipc::StartListening(SOCK_DGRAM,
                         IPPROTO_UDP,
                         incomingAddr,
                         COMM_NONBLOCKING,
-                        Ipc::fdnInHtcpSocket, call);
+                        Ipc::fdnInHtcpSocket, call, nilSub);
 
     if (!Config.Addrs.udp_outgoing.IsNoAddr()) {
         Ip::Address outgoingAddr = Config.Addrs.udp_outgoing;

=== modified file 'src/icp_v2.cc'
--- src/icp_v2.cc	2010-12-13 11:31:14 +0000
+++ src/icp_v2.cc	2011-01-09 08:30:19 +0000
@@ -698,11 +698,13 @@
                                         "icpIncomingConnectionOpened",
                                         IcpListeningStartedDialer(&icpIncomingConnectionOpened, addr));
 
+    Subscription::Pointer nilSub;
+
     Ipc::StartListening(SOCK_DGRAM,
                         IPPROTO_UDP,
                         addr,
                         COMM_NONBLOCKING,
-                        Ipc::fdnInIcpSocket, call);
+                        Ipc::fdnInIcpSocket, call, nilSub);
 
     addr.SetEmpty(); // clear for next use.
     addr = Config.Addrs.udp_outgoing;

=== modified file 'src/ipc/SharedListen.cc'
--- src/ipc/SharedListen.cc	2010-10-28 18:52:59 +0000
+++ src/ipc/SharedListen.cc	2011-01-09 00:21:05 +0000
@@ -150,5 +150,6 @@
     Must(cbd);
     cbd->fd = fd;
     cbd->errNo = response.errNo;
+    cbd->handlerSubscription = por.params.handlerSubscription;
     ScheduleCallHere(por.callback);
 }

=== modified file 'src/ipc/SharedListen.h'
--- src/ipc/SharedListen.h	2010-07-06 23:09:44 +0000
+++ src/ipc/SharedListen.h	2011-01-09 00:18:50 +0000
@@ -9,6 +9,7 @@
 #define SQUID_IPC_SHARED_LISTEN_H
 
 #include "base/AsyncCall.h"
+#include "base/Subscription.h"
 
 namespace Ipc
 {
@@ -28,6 +29,9 @@
     Ip::Address addr; ///< will be memset and memcopied
     int flags;
     int fdNote; ///< index into fd_note() comment strings
+
+    /// handler to subscribe to Comm::TcpAcceptor
+    Subscription::Pointer handlerSubscription;
 };
 
 class TypedMsgHdr;

=== modified file 'src/ipc/StartListening.cc'
--- src/ipc/StartListening.cc	2010-07-06 23:09:44 +0000
+++ src/ipc/StartListening.cc	2011-01-10 00:52:21 +0000
@@ -6,8 +6,9 @@
  */
 
 #include "config.h"
+#include "base/TextException.h"
 #include "comm.h"
-#include "base/TextException.h"
+#include "comm/TcpAcceptor.h"
 #include "ipc/SharedListen.h"
 #include "ipc/StartListening.h"
 
@@ -25,34 +26,41 @@
     return os << "(FD " << fd << ", err=" << errNo;
 }
 
-
-void Ipc::StartListening(int sock_type, int proto, Ip::Address &addr,
-                         int flags, FdNoteId fdNote, AsyncCall::Pointer &callback)
+void
+Ipc::StartListening(int sock_type, int proto, Ip::Address &addr, int flags,
+                    FdNoteId fdNote, AsyncCall::Pointer &callback, const Subscription::Pointer &sub)
 {
-    OpenListenerParams p;
-    p.sock_type = sock_type;
-    p.proto = proto;
-    p.addr = addr;
-    p.flags = flags;
-    p.fdNote = fdNote;
-
-    if (UsingSmp()) { // if SMP is on, share
+   if (UsingSmp()) { // if SMP is on, share
+        OpenListenerParams p;
+        p.sock_type = sock_type;
+        p.proto = proto;
+        p.addr = addr;
+        p.flags = flags;
+        p.fdNote = fdNote;
+        p.handlerSubscription = sub;
         Ipc::JoinSharedListen(p, callback);
         return; // wait for the call back
     }
 
+    StartListeningCb *cbd = dynamic_cast<StartListeningCb*>(callback->getDialer());
+    Must(cbd);
+
     enter_suid();
-    const int sock = comm_open_listener(p.sock_type, p.proto, p.addr, p.flags,
-                                        FdNote(p.fdNote));
-    const int errNo = (sock >= 0) ? 0 : errno;
+    if (sock_type == SOCK_STREAM) {
+        // TCP: setup a job to handle accept() with subscribed handler
+        Comm::TcpAcceptor *tmp = new Comm::TcpAcceptor(cbd->fd, addr, flags, FdNote(fdNote), sub);
+        cbd->fd = tmp->fd;
+        AsyncJob::Start(tmp);
+    } else if (sock_type == SOCK_DGRAM) {
+        // UDP: setup the listener socket, but do not set a subscriber
+        // TODO: create a UDP sbscription so packet event calls get scheduled and queued Async.
+        cbd->fd = comm_open_listener(sock_type, proto, addr, flags, FdNote(fdNote));
+    } else {
+        fatalf("Invalid Socket Type (%d)",sock_type);
+    }
+    cbd->errNo = cbd->fd >= 0 ? 0 : errno;
     leave_suid();
 
-    debugs(54, 3, HERE << "opened listen FD " << sock << " for " << p.addr);
-
-    StartListeningCb *cbd =
-        dynamic_cast<StartListeningCb*>(callback->getDialer());
-    Must(cbd);
-    cbd->fd = sock;
-    cbd->errNo = errNo;
+    debugs(54, 3, HERE << "opened listen FD " << cbd->fd << " on " << addr);
     ScheduleCallHere(callback);
 }

=== modified file 'src/ipc/StartListening.h'
--- src/ipc/StartListening.h	2010-11-21 04:40:05 +0000
+++ src/ipc/StartListening.h	2011-01-09 00:30:35 +0000
@@ -11,6 +11,7 @@
 #include "ip/forward.h"
 #include "ipc/FdNotes.h"
 #include "base/AsyncCall.h"
+#include "base/Subscription.h"
 
 #if HAVE_IOSFWD
 #include <iosfwd>
@@ -32,12 +33,15 @@
 public:
     int fd; ///< opened listening socket or -1
     int errNo; ///< errno value from the comm_open_listener() call
+
+    /// The subscription we will pass on to the Comm::TcpAcceptor
+    Subscription::Pointer handlerSubscription;
 };
 
 /// Depending on whether SMP is on, either ask Coordinator to send us
 /// the listening FD or call comm_open_listener() directly.
-extern void StartListening(int sock_type, int proto, Ip::Address &addr,
-                           int flags, FdNoteId fdNote, AsyncCall::Pointer &callback);
+extern void StartListening(int sock_type, int proto, Ip::Address &addr, int flags,
+                           FdNoteId fdNote, AsyncCall::Pointer &callback, const Subscription::Pointer &sub);
 
 } // namespace Ipc;
 

=== modified file 'src/snmp_core.cc'
--- src/snmp_core.cc	2010-12-13 11:31:14 +0000
+++ src/snmp_core.cc	2011-01-09 08:32:43 +0000
@@ -318,12 +318,13 @@
         AsyncCall::Pointer call = asyncCall(49, 2,
                                             "snmpIncomingConnectionOpened",
                                             SnmpListeningStartedDialer(&snmpIncomingConnectionOpened));
+        Subscription::Pointer nilSub;
 
         Ipc::StartListening(SOCK_DGRAM,
                             IPPROTO_UDP,
                             Config.Addrs.snmp_incoming,
                             COMM_NONBLOCKING,
-                            Ipc::fdnInSnmpSocket, call);
+                            Ipc::fdnInSnmpSocket, call, nilSub);
 
         if (!Config.Addrs.snmp_outgoing.IsNoAddr()) {
             Config.Addrs.snmp_outgoing.SetPort(Config.Port.snmp);
@@ -339,12 +340,13 @@
             AsyncCall::Pointer call = asyncCall(49, 2,
                                                 "snmpOutgoingConnectionOpened",
                                                 SnmpListeningStartedDialer(&snmpOutgoingConnectionOpened));
+            Subscription::Pointer nilSub;
 
             Ipc::StartListening(SOCK_DGRAM,
                                 IPPROTO_UDP,
                                 Config.Addrs.snmp_outgoing,
                                 COMM_NONBLOCKING,
-                                Ipc::fdnOutSnmpSocket, call);
+                                Ipc::fdnOutSnmpSocket, call, nilSub);
         }
     }
 }

Reply via email to