Attached patch contains the cleanup-comm branch changes to
ListenStateData which make it an AsyncJob (called Comm::TcpAcceptor)
using Subscriptions to generate calls.
This fixes the call re-use problem of bug 3081 and a related FTP data
connection bug.
Alex:
please test that this does not break with SMP support. I've run it
with 2 workers on a non-SMP machine, but have not checked for anything
more complex.
NP: there are still FTP problems with EPRT/PORT handling of data
connections from servers which do not terminate their directory transfer
with \r\n. That is outside this patch scope.
Amos
=== modified file 'src/CommCalls.cc'
--- src/CommCalls.cc 2009-07-12 22:56:47 +0000
+++ src/CommCalls.cc 2011-01-09 01:36:41 +0000
@@ -130,6 +130,12 @@
{
}
+CommAcceptCbPtrFun::CommAcceptCbPtrFun(const CommAcceptCbPtrFun &o):
+ CommDialerParamsT<CommAcceptCbParams>(o.params),
+ handler(o.handler)
+{
+}
+
void
CommAcceptCbPtrFun::dial()
{
=== modified file 'src/CommCalls.h'
--- src/CommCalls.h 2010-08-24 00:12:54 +0000
+++ src/CommCalls.h 2011-01-09 04:56:21 +0000
@@ -176,8 +176,11 @@
{
public:
typedef CommAcceptCbParams Params;
+ typedef RefCount<CommAcceptCbPtrFun> Pointer;
CommAcceptCbPtrFun(IOACB *aHandler, const CommAcceptCbParams &aParams);
+ CommAcceptCbPtrFun(const CommAcceptCbPtrFun &o);
+
void dial();
virtual void print(std::ostream &os) const;
@@ -259,11 +262,17 @@
class CommCbFunPtrCallT: public AsyncCall
{
public:
+ typedef RefCount<CommCbFunPtrCallT<Dialer> > Pointer;
typedef typename Dialer::Params Params;
inline CommCbFunPtrCallT(int debugSection, int debugLevel,
const char *callName, const Dialer &aDialer);
+ inline CommCbFunPtrCallT(const Pointer &p) :
+ AsyncCall(p->debugSection, p->debugLevel, p->name),
+ dialer(p->dialer)
+ {}
+
virtual CallDialer* getDialer() { return &dialer; }
public:
=== modified file 'src/ProtoPort.cc'
--- src/ProtoPort.cc 2010-11-18 08:01:53 +0000
+++ src/ProtoPort.cc 2011-01-09 00:58:29 +0000
@@ -3,15 +3,17 @@
*/
#include "squid.h"
+#include "comm.h"
#include "ProtoPort.h"
#if HAVE_LIMITS
#include <limits>
#endif
-http_port_list::http_port_list(const char *aProtocol)
+http_port_list::http_port_list(const char *aProtocol) :
+ listenFd(-1)
#if USE_SSL
- :
- http(*this), dynamicCertMemCacheSize(std::numeric_limits<size_t>::max())
+ , http(*this)
+ , dynamicCertMemCacheSize(std::numeric_limits<size_t>::max())
#endif
{
protocol = xstrdup(aProtocol);
@@ -19,7 +21,10 @@
http_port_list::~http_port_list()
{
- delete listener;
+ if (listenFd >= 0) {
+ comm_close(listenFd);
+ listenFd = -1;
+ }
safe_free(name);
safe_free(defaultsite);
=== modified file 'src/ProtoPort.h'
--- src/ProtoPort.h 2010-11-18 08:01:53 +0000
+++ src/ProtoPort.h 2011-01-09 00:55:34 +0000
@@ -4,9 +4,7 @@
#ifndef SQUID_PROTO_PORT_H
#define SQUID_PROTO_PORT_H
-//#include "typedefs.h"
#include "cbdata.h"
-#include "comm/ListenStateData.h"
#if USE_SSL
#include "ssl/gadgets.h"
@@ -43,11 +41,11 @@
} tcp_keepalive;
/**
- * The FD listening socket handler.
- * If not NULL we are actively listening for client requests.
- * delete to close the socket.
+ * The FD listening socket.
+ * If >= 0 we are actively listening for client requests.
+ * use comm_close(listenFd) to stop.
*/
- Comm::ListenStateData *listener;
+ int listenFd;
#if USE_SSL
// XXX: temporary hack to ease move of SSL options to http_port
=== modified file 'src/base/AsyncCall.h'
--- src/base/AsyncCall.h 2010-12-02 23:33:27 +0000
+++ src/base/AsyncCall.h 2011-01-09 04:58:01 +0000
@@ -45,6 +45,8 @@
friend class AsyncCallQueue;
AsyncCall(int aDebugSection, int aDebugLevel, const char *aName);
+ AsyncCall();
+ AsyncCall(const AsyncCall &);
virtual ~AsyncCall();
void make(); // fire if we can; handles general call debugging
@@ -122,6 +124,10 @@
const Dialer &aDialer): AsyncCall(aDebugSection, aDebugLevel, aName),
dialer(aDialer) {}
+ AsyncCallT(const RefCount<AsyncCallT<Dialer> > &o):
+ AsyncCall(o->debugSection, o->debugLevel, o->name),
+ dialer(o->dialer) {}
+
CallDialer *getDialer() { return &dialer; }
protected:
=== modified file 'src/client_side.cc'
--- src/client_side.cc 2010-12-16 01:15:12 +0000
+++ src/client_side.cc 2011-01-10 00:46:31 +0000
@@ -92,8 +92,9 @@
#include "ClientRequestContext.h"
#include "clientStream.h"
#include "comm.h"
+#include "CommCalls.h"
#include "comm/Write.h"
-#include "comm/ListenStateData.h"
+#include "comm/TcpAcceptor.h"
#include "base/TextException.h"
#include "ConnectionDetail.h"
#include "eui/Config.h"
@@ -135,13 +136,13 @@
#define comm_close comm_lingering_close
#endif
-/// dials clientHttpConnectionOpened or clientHttpsConnectionOpened call
+/// dials clientListenerConnectionOpened call
class ListeningStartedDialer: public CallDialer, public Ipc::StartListeningCb
{
public:
- typedef void (*Handler)(int fd, int errNo, http_port_list *portCfg);
- ListeningStartedDialer(Handler aHandler, http_port_list *aPortCfg):
- handler(aHandler), portCfg(aPortCfg) {}
+ typedef void (*Handler)(int fd, int errNo, http_port_list *portCfg, bool uses_ssl);
+ ListeningStartedDialer(Handler aHandler, http_port_list *aPortCfg, bool aSslFlag):
+ handler(aHandler), portCfg(aPortCfg), uses_ssl(aSslFlag) {}
virtual void print(std::ostream &os) const {
startPrint(os) <<
@@ -149,20 +150,17 @@
}
virtual bool canDial(AsyncCall &) const { return true; }
- virtual void dial(AsyncCall &) { (handler)(fd, errNo, portCfg); }
+ virtual void dial(AsyncCall &) { (handler)(fd, errNo, portCfg, uses_ssl); }
public:
Handler handler;
private:
http_port_list *portCfg; ///< from Config.Sockaddr.http
+ bool uses_ssl;
};
-
-static void clientHttpConnectionOpened(int fd, int errNo, http_port_list *s);
-#if USE_SSL
-static void clientHttpsConnectionOpened(int fd, int errNo, http_port_list *s);
-#endif
+static void clientListenerConnectionOpened(int fd, int errNo, http_port_list *s, bool uses_ssl);
/* our socket-related context */
@@ -3122,14 +3120,15 @@
/** Handle a new connection on HTTP socket. */
void
-httpAccept(int sock, int newfd, ConnectionDetail *details,
- comm_err_t flag, int xerrno, void *data)
+httpAccept(int, int newfd, ConnectionDetail *details, comm_err_t flag, int xerrno, void *data)
{
http_port_list *s = (http_port_list *)data;
ConnStateData *connState = NULL;
if (flag != COMM_OK) {
- debugs(33, 1, "httpAccept: FD " << sock << ": accept failure: " << xstrerr(xerrno));
+ // This should not occur with TcpAcceptor.
+ // However its possible the call was still queued when the client disconnected
+ debugs(33, 1, "httpAccept: FD " << s->listenFd << ": accept failure: " << xstrerr(xerrno));
return;
}
@@ -3368,15 +3367,16 @@
/** handle a new HTTPS connection */
static void
-httpsAccept(int sock, int newfd, ConnectionDetail *details,
- comm_err_t flag, int xerrno, void *data)
+httpsAccept(int, int newfd, ConnectionDetail *details, comm_err_t flag, int xerrno, void *data)
{
https_port_list *s = (https_port_list *)data;
SSL_CTX *sslContext = s->staticSslContext.get();
if (flag != COMM_OK) {
+ // This should not occur with TcpAcceptor.
+ // However its possible the call was still queued when the client disconnected
errno = xerrno;
- debugs(33, 1, "httpsAccept: FD " << sock << ": accept failure: " << xstrerr(xerrno));
+ debugs(33, 1, "httpsAccept: FD " << s->listenFd << ": accept failure: " << xstrerr(xerrno));
return;
}
@@ -3625,13 +3625,16 @@
const int openFlags = COMM_NONBLOCKING |
(s->spoof_client_ip ? COMM_TRANSPARENT : 0);
- AsyncCall::Pointer callback = asyncCall(33,2,
- "clientHttpConnectionOpened",
- ListeningStartedDialer(&clientHttpConnectionOpened, s));
- Ipc::StartListening(SOCK_STREAM, IPPROTO_TCP, s->s, openFlags,
- Ipc::fdnHttpSocket, callback);
-
- HttpSockets[NHttpSockets++] = -1; // set in clientHttpConnectionOpened
+ // setup the subscriptions such that new connections accepted by listenConn are handled by HTTP
+ typedef CommCbFunPtrCallT<CommAcceptCbPtrFun> AcceptCall;
+ RefCount<AcceptCall> subCall = commCbCall(5, 5, "httpAccept", CommAcceptCbPtrFun(httpAccept, s));
+ Subscription::Pointer sub = new CallSubscription<AcceptCall>(subCall);
+
+ AsyncCall::Pointer listenCall = asyncCall(33,2, "clientListenerConnectionOpened",
+ ListeningStartedDialer(&clientListenerConnectionOpened, s, false));
+ Ipc::StartListening(SOCK_STREAM, IPPROTO_TCP, s->s, openFlags, Ipc::fdnHttpSocket, listenCall, sub);
+
+ HttpSockets[NHttpSockets++] = -1; // set in clientListenerConnectionOpened
}
#if USE_SSL
@@ -3644,27 +3647,24 @@
/// process clientHttpConnectionsOpen result
static void
-clientHttpConnectionOpened(int fd, int, http_port_list *s)
+clientListenerConnectionOpened(int fd, int errNo, http_port_list *s, bool uses_ssl)
{
- if (!OpenedHttpSocket(fd, "Cannot open HTTP Port"))
+ s->listenFd = fd;
+ if (!OpenedHttpSocket(s->listenFd, (uses_ssl?"Cannot open HTTPS Port":"Cannot open HTTP Port")))
return;
Must(s);
-
- AsyncCall::Pointer call = commCbCall(5,5, "SomeCommAcceptHandler(httpAccept)",
- CommAcceptCbPtrFun(httpAccept, s));
-
- s->listener = new Comm::ListenStateData(fd, call, true);
-
- debugs(1, 1, "Accepting " <<
+ Must(s->listenFd >= 0);
+
+ debugs(1, 1, "Accepting" <<
(s->intercepted ? " intercepted" : "") <<
(s->spoof_client_ip ? " spoofing" : "") <<
(s->sslBump ? " bumpy" : "") <<
(s->accel ? " accelerated" : "")
- << " HTTP connections at " << s->s
- << ", FD " << fd << "." );
+ << " HTTP" << (uses_ssl?"S":"") << " connections at "
+ << " FD " << s->listenFd << " on " << s->s);
- Must(AddOpenedHttpSocket(fd)); // otherwise, we have received a fd we did not ask for
+ Must(AddOpenedHttpSocket(s->listenFd)); // otherwise, we have received a fd we did not ask for
}
#if USE_SSL
@@ -3686,35 +3686,22 @@
continue;
}
- AsyncCall::Pointer call = asyncCall(33, 2, "clientHttpsConnectionOpened",
- ListeningStartedDialer(&clientHttpsConnectionOpened, &s->http));
-
- Ipc::StartListening(SOCK_STREAM, IPPROTO_TCP, s->http.s, COMM_NONBLOCKING,
- Ipc::fdnHttpsSocket, call);
+ const int openFlags = COMM_NONBLOCKING |
+ (s->spoof_client_ip ? COMM_TRANSPARENT : 0);
+
+ // setup the subscriptions such that new connections accepted by listenConn are handled by HTTPS
+ typedef CommCbFunPtrCallT<CommAcceptCbPtrFun> AcceptCall;
+ RefCount<AcceptCall> subCall = commCbCall(5, 5, "httpsAccept", CommAcceptCbPtrFun(httpsAccept, s));
+ Subscription::Pointer sub = new CallSubscription<AcceptCall>(subCall);
+
+ AsyncCall::Pointer listenCall = asyncCall(33, 2, "clientListenerConnectionOpened",
+ ListeningStartedDialer(&clientListenerConnectionOpened, &s->http, true));
+
+ Ipc::StartListening(SOCK_STREAM, IPPROTO_TCP, s->s, openFlags, Ipc::fdnHttpsSocket, listenCall, sub);
HttpSockets[NHttpSockets++] = -1;
}
}
-
-/// process clientHttpsConnectionsOpen result
-static void
-clientHttpsConnectionOpened(int fd, int, http_port_list *s)
-{
- if (!OpenedHttpSocket(fd, "Cannot open HTTPS Port"))
- return;
-
- Must(s);
-
- AsyncCall::Pointer call = commCbCall(5,5, "SomeCommAcceptHandler(httpsAccept)",
- CommAcceptCbPtrFun(httpsAccept, s));
-
- s->listener = new Comm::ListenStateData(fd, call, true);
-
- debugs(1, 1, "Accepting HTTPS connections at " << s->s << ", FD " << fd << ".");
-
- Must(AddOpenedHttpSocket(fd)); // otherwise, we have received a fd we did not ask for
-}
-
#endif
void
@@ -3733,19 +3720,19 @@
clientHttpConnectionsClose(void)
{
for (http_port_list *s = Config.Sockaddr.http; s; s = s->next) {
- if (s->listener) {
- debugs(1, 1, "FD " << s->listener->fd << " Closing HTTP connection");
- delete s->listener;
- s->listener = NULL;
+ if (s->listenFd >= 0) {
+ debugs(1, 1, "FD " << s->listenFd << " Closing HTTP connection");
+ comm_close(s->listenFd);
+ s->listenFd = -1;
}
}
#if USE_SSL
for (http_port_list *s = Config.Sockaddr.https; s; s = s->next) {
- if (s->listener) {
- debugs(1, 1, "FD " << s->listener->fd << " Closing HTTPS connection");
- delete s->listener;
- s->listener = NULL;
+ if (s->listenFd >= 0) {
+ debugs(1, 1, "FD " << s->listenFd << " Closing HTTPS connection");
+ comm_close(s->listenFd);
+ s->listenFd = -1;
}
}
#endif
=== modified file 'src/comm.cc'
--- src/comm.cc 2010-12-06 20:06:08 +0000
+++ src/comm.cc 2011-01-09 10:11:24 +0000
@@ -41,7 +41,7 @@
#include "comm/comm_internal.h"
#include "comm/IoCallback.h"
#include "comm/Write.h"
-#include "comm/ListenStateData.h"
+#include "comm/TcpAcceptor.h"
#include "CommIO.h"
#include "CommRead.h"
#include "ConnectionDetail.h"
@@ -143,7 +143,7 @@
bool
isOpen(const int fd)
{
- return fd_table[fd].flags.open != 0;
+ return fd >= 0 && fd_table[fd].flags.open != 0;
}
/**
=== modified file 'src/comm/AcceptLimiter.cc'
--- src/comm/AcceptLimiter.cc 2009-12-31 02:35:01 +0000
+++ src/comm/AcceptLimiter.cc 2011-01-10 00:52:51 +0000
@@ -1,6 +1,6 @@
#include "config.h"
#include "comm/AcceptLimiter.h"
-#include "comm/ListenStateData.h"
+#include "comm/TcpAcceptor.h"
#include "fde.h"
Comm::AcceptLimiter Comm::AcceptLimiter::Instance_;
@@ -11,7 +11,7 @@
}
void
-Comm::AcceptLimiter::defer(Comm::ListenStateData *afd)
+Comm::AcceptLimiter::defer(Comm::TcpAcceptor *afd)
{
afd->isLimited++;
debugs(5, 5, HERE << "FD " << afd->fd << " x" << afd->isLimited);
@@ -19,14 +19,33 @@
}
void
+Comm::AcceptLimiter::removeDead(const Comm::TcpAcceptor *afd)
+{
+ for (unsigned int i = 0; i < deferred.size() && afd->isLimited > 0; i++) {
+ if (deferred[i] == afd) {
+ deferred[i]->isLimited--;
+ deferred[i] = NULL; // fast. kick() will skip empty entries later.
+ debugs(5, 5, HERE << "FD " << afd->fd << " x" << afd->isLimited);
+ }
+ }
+}
+
+void
Comm::AcceptLimiter::kick()
{
+ // TODO: this could be optimized further with an iterator to search
+ // looking for first non-NULL, followed by dumping the first N
+ // with only one shift()/pop_front operation
+
debugs(5, 5, HERE << " size=" << deferred.size());
- if (deferred.size() > 0 && fdNFree() >= RESERVED_FD) {
- debugs(5, 5, HERE << " doing one.");
+ while (deferred.size() > 0 && fdNFree() >= RESERVED_FD) {
/* NP: shift() is equivalent to pop_front(). Giving us a FIFO queue. */
- ListenStateData *temp = deferred.shift();
- temp->isLimited--;
- temp->acceptNext();
+ TcpAcceptor *temp = deferred.shift();
+ if (temp != NULL) {
+ debugs(5, 5, HERE << " doing one.");
+ temp->isLimited--;
+ temp->acceptNext();
+ break;
+ }
}
}
=== modified file 'src/comm/AcceptLimiter.h'
--- src/comm/AcceptLimiter.h 2010-01-13 01:13:17 +0000
+++ src/comm/AcceptLimiter.h 2011-01-08 14:09:20 +0000
@@ -6,7 +6,7 @@
namespace Comm
{
-class ListenStateData;
+class TcpAcceptor;
/**
* FIFO Queue holding listener socket handlers which have been activated
@@ -25,7 +25,10 @@
static AcceptLimiter &Instance();
/** delay accepting a new client connection. */
- void defer(Comm::ListenStateData *afd);
+ void defer(Comm::TcpAcceptor *afd);
+
+ /** remove all records of an acceptor. Only to be called by the ConnAcceptor::swanSong() */
+ void removeDead(const Comm::TcpAcceptor *afd);
/** try to accept and begin processing any delayed client connections. */
void kick();
@@ -34,7 +37,7 @@
static AcceptLimiter Instance_;
/** FIFO queue */
- Vector<Comm::ListenStateData*> deferred;
+ Vector<Comm::TcpAcceptor*> deferred;
};
}; // namepace Comm
=== modified file 'src/comm/Makefile.am'
--- src/comm/Makefile.am 2010-11-27 01:46:22 +0000
+++ src/comm/Makefile.am 2011-01-08 11:40:46 +0000
@@ -7,8 +7,8 @@
libcomm_la_SOURCES= \
AcceptLimiter.cc \
AcceptLimiter.h \
- ListenStateData.cc \
- ListenStateData.h \
+ TcpAcceptor.cc \
+ TcpAcceptor.h \
\
IoCallback.cc \
IoCallback.h \
=== renamed file 'src/comm/ListenStateData.cc' => 'src/comm/TcpAcceptor.cc'
--- src/comm/ListenStateData.cc 2010-08-10 03:11:19 +0000
+++ src/comm/TcpAcceptor.cc 2011-01-09 09:51:14 +0000
@@ -33,15 +33,98 @@
*/
#include "squid.h"
+#include "base/TextException.h"
#include "CommCalls.h"
#include "comm/AcceptLimiter.h"
#include "comm/comm_internal.h"
-#include "comm/ListenStateData.h"
-#include "ConnectionDetail.h"
+#include "comm/TcpAcceptor.h"
#include "fde.h"
#include "protos.h"
#include "SquidTime.h"
+namespace Comm {
+ CBDATA_CLASS_INIT(TcpAcceptor);
+};
+
+Comm::TcpAcceptor::TcpAcceptor(const int listenFd, const Ip::Address &laddr, int flags,
+ const char *note, const Subscription::Pointer &aSub) :
+ AsyncJob("Comm::TcpAcceptor"),
+ errcode(0),
+ isLimited(0),
+ theCallSub(aSub),
+ fd(listenFd),
+ local_addr(laddr),
+ newFd_(-1)
+{
+ /* open the conn if its not already open */
+ if (fd < 0) {
+ fd = comm_open_listener(SOCK_STREAM, IPPROTO_TCP, local_addr, flags, note);
+ errcode = errno;
+
+ if (fd < 0) {
+ debugs(5, DBG_CRITICAL, HERE << "comm_open failed: FD " << fd << ", " << local_addr << " error: " << errcode);
+ return;
+ }
+ debugs(9, 3, HERE << "Unconnected data socket created on FD " << fd << ", " << local_addr);
+ }
+}
+
+void
+Comm::TcpAcceptor::subscribe(const Subscription::Pointer &aSub)
+{
+ debugs(5, 5, HERE << "FD " << fd << ", " << local_addr << " AsyncCall Subscription: " << aSub);
+ unsubscribe("subscription change");
+ theCallSub = aSub;
+}
+
+void
+Comm::TcpAcceptor::unsubscribe(const char *reason)
+{
+ debugs(5, 5, HERE << "FD " << fd << ", " << local_addr << " AsyncCall Subscription " << theCallSub << " removed: " << reason);
+ theCallSub = NULL;
+}
+
+void
+Comm::TcpAcceptor::start()
+{
+ debugs(5, 5, HERE << "FD " << fd << ", " << local_addr << " AsyncCall Subscription: " << theCallSub);
+
+ Must(isOpen(fd));
+
+ setListen();
+
+ // if no error so far start accepting connections.
+ if (errcode == 0)
+ commSetSelect(fd, COMM_SELECT_READ, doAccept, this, 0);
+}
+
+bool
+Comm::TcpAcceptor::doneAll() const
+{
+ // stop when FD is closed
+ if (!isOpen(fd)) {
+ return AsyncJob::doneAll();
+ }
+
+ // stop when handlers are gone
+ if (theCallSub == NULL) {
+ return AsyncJob::doneAll();
+ }
+
+ // open FD with handlers...keep accepting.
+ return false;
+}
+
+void
+Comm::TcpAcceptor::swanSong()
+{
+ debugs(5,5, HERE);
+ unsubscribe("swanSong");
+ fd = -1;
+ AcceptLimiter::Instance().removeDead(this);
+ AsyncJob::swanSong();
+}
+
/**
* New-style listen and accept routines
*
@@ -50,11 +133,11 @@
* accept()ed some time later.
*/
void
-Comm::ListenStateData::setListen()
+Comm::TcpAcceptor::setListen()
{
errcode = 0; // reset local errno copy.
if (listen(fd, Squid_MaxFD >> 2) < 0) {
- debugs(50, 0, HERE << "listen(FD " << fd << ", " << (Squid_MaxFD >> 2) << "): " << xstrerror());
+ debugs(50, DBG_CRITICAL, "ERROR: listen(FD " << fd << ", " << local_addr << ", " << (Squid_MaxFD >> 2) << "): " << xstrerror());
errcode = errno;
return;
}
@@ -66,37 +149,19 @@
debugs(5, DBG_IMPORTANT, "Installing accept filter '" << Config.accept_filter << "' on FD " << fd);
xstrncpy(afa.af_name, Config.accept_filter, sizeof(afa.af_name));
if (setsockopt(fd, SOL_SOCKET, SO_ACCEPTFILTER, &afa, sizeof(afa)) < 0)
- debugs(5, DBG_CRITICAL, "SO_ACCEPTFILTER '" << Config.accept_filter << "': '" << xstrerror());
+ debugs(5, DBG_CRITICAL, "WARNING: SO_ACCEPTFILTER '" << Config.accept_filter << "': '" << xstrerror());
#elif defined(TCP_DEFER_ACCEPT)
int seconds = 30;
if (strncmp(Config.accept_filter, "data=", 5) == 0)
seconds = atoi(Config.accept_filter + 5);
if (setsockopt(fd, IPPROTO_TCP, TCP_DEFER_ACCEPT, &seconds, sizeof(seconds)) < 0)
- debugs(5, DBG_CRITICAL, "TCP_DEFER_ACCEPT '" << Config.accept_filter << "': '" << xstrerror());
+ debugs(5, DBG_CRITICAL, "WARNING: TCP_DEFER_ACCEPT '" << Config.accept_filter << "': '" << xstrerror());
#else
- debugs(5, DBG_CRITICAL, "accept_filter not supported on your OS");
+ debugs(5, DBG_CRITICAL, "WARNING: accept_filter not supported on your OS");
#endif
}
}
-Comm::ListenStateData::ListenStateData(int aFd, AsyncCall::Pointer &call, bool accept_many) :
- fd(aFd),
- theCallback(call),
- mayAcceptMore(accept_many)
-{
- assert(aFd >= 0);
- debugs(5, 5, HERE << "FD " << fd << " AsyncCall: " << call);
- assert(isOpen(aFd));
- setListen();
- commSetSelect(fd, COMM_SELECT_READ, doAccept, this, 0);
-}
-
-Comm::ListenStateData::~ListenStateData()
-{
- comm_close(fd);
- fd = -1;
-}
-
/**
* This private callback is called whenever a filedescriptor is ready
* to dupe itself and fob off an accept()ed connection
@@ -107,23 +172,30 @@
* done later when enough sockets become available.
*/
void
-Comm::ListenStateData::doAccept(int fd, void *data)
+Comm::TcpAcceptor::doAccept(int fd, void *data)
{
- debugs(5, 2, HERE << "New connection on FD " << fd);
-
- assert(isOpen(fd));
- ListenStateData *afd = static_cast<ListenStateData*>(data);
-
- if (!okToAccept()) {
- AcceptLimiter::Instance().defer(afd);
- } else {
- afd->acceptNext();
+ try {
+ debugs(5, 2, HERE << "New connection on FD " << fd);
+
+ Must(isOpen(fd));
+ TcpAcceptor *afd = static_cast<TcpAcceptor*>(data);
+
+ if (!okToAccept()) {
+ AcceptLimiter::Instance().defer(afd);
+ } else {
+ afd->acceptNext();
+ }
+ commSetSelect(fd, COMM_SELECT_READ, Comm::TcpAcceptor::doAccept, afd, 0);
+
+ } catch(const TextException &e) {
+ fatalf("FATAL: error while accepting new client connection: %s\n", e.message);
+ } catch(...) {
+ fatal("FATAL: error while accepting new client connection: [unkown]\n");
}
- commSetSelect(fd, COMM_SELECT_READ, Comm::ListenStateData::doAccept, afd, 0);
}
bool
-Comm::ListenStateData::okToAccept()
+Comm::TcpAcceptor::okToAccept()
{
static time_t last_warn = 0;
@@ -139,7 +211,7 @@
}
void
-Comm::ListenStateData::acceptOne()
+Comm::TcpAcceptor::acceptOne()
{
/*
* We don't worry about running low on FDs here. Instead,
@@ -148,42 +220,44 @@
*/
/* Accept a new connection */
- ConnectionDetail connDetails;
- int newfd = oldAccept(connDetails);
+ ConnectionDetail newConnDetails;
+ comm_err_t status = oldAccept(newConnDetails);
/* Check for errors */
- if (newfd < 0) {
+ if (!isOpen(newFd_)) {
- if (newfd == COMM_NOMESSAGE) {
+ if (status == COMM_NOMESSAGE) {
/* register interest again */
- debugs(5, 5, HERE << "try later: FD " << fd << " handler: " << theCallback);
+ debugs(5, 5, HERE << "try later: FD " << fd << " handler Subscription: " << theCallSub);
commSetSelect(fd, COMM_SELECT_READ, doAccept, this, 0);
return;
}
// A non-recoverable error; notify the caller */
- debugs(5, 5, HERE << "non-recoverable error: FD " << fd << " handler: " << theCallback);
- notify(-1, COMM_ERROR, connDetails);
- mayAcceptMore = false;
+ debugs(5, 5, HERE << "non-recoverable error: FD " << fd << ", " << local_addr << " handler Subscription: " << theCallSub);
+ notify(status, newConnDetails);
+ mustStop("Listener socket closed");
return;
}
- debugs(5, 5, HERE << "accepted: FD " << fd <<
- " newfd: " << newfd << " from: " << connDetails.peer <<
- " handler: " << theCallback);
- notify(newfd, COMM_OK, connDetails);
+ debugs(5, 5, HERE << "Listener: FD " << fd <<
+ " accepted new connection from " << newConnDetails.peer <<
+ " handler Subscription: " << theCallSub);
+ notify(status, newConnDetails);
}
void
-Comm::ListenStateData::acceptNext()
+Comm::TcpAcceptor::acceptNext()
{
- assert(isOpen(fd));
+ Must(isOpen(fd));
debugs(5, 2, HERE << "connection on FD " << fd);
acceptOne();
}
+// XXX: obsolete comment?
+// NP: can't be a const function because syncWithComm() side effects hit theCallSub->callback().
void
-Comm::ListenStateData::notify(int newfd, comm_err_t flag, const ConnectionDetail &connDetails)
+Comm::TcpAcceptor::notify(comm_err_t flag, const ConnectionDetail &connDetails)
{
// listener socket handlers just abandon the port with COMM_ERR_CLOSING
// it should only happen when this object is deleted...
@@ -191,26 +265,33 @@
return;
}
- if (theCallback != NULL) {
- typedef CommAcceptCbParams Params;
- Params ¶ms = GetCommParams<Params>(theCallback);
+ if (theCallSub != NULL) {
+ AsyncCall::Pointer call = theCallSub->callback();
+ CommAcceptCbParams ¶ms = GetCommParams<CommAcceptCbParams>(call);
params.fd = fd;
- params.nfd = newfd;
+ params.nfd = newFd_;
params.details = connDetails;
params.flag = flag;
params.xerrno = errcode;
- ScheduleCallHere(theCallback);
- if (!mayAcceptMore)
- theCallback = NULL;
+ ScheduleCallHere(call);
}
+
+ // drop the temporary recent accepted socket FD details.
+ // this prevents information crossover on calls.
+ newFd_ = -1;
}
/**
* accept() and process
- * Wait for an incoming connection on FD.
+ * Wait for an incoming connection on our listener socket.
+ *
+ * \retval COMM_OK success. details parameter filled.
+ * \retval COMM_NOMESSAGE attempted accept() but nothing useful came in.
+ * \retval COMM_ERROR an outright failure occured.
+ * Or if this client has too many connections already.
*/
-int
-Comm::ListenStateData::oldAccept(ConnectionDetail &details)
+comm_err_t
+Comm::TcpAcceptor::oldAccept(ConnectionDetail &details)
{
PROF_start(comm_accept);
statCounter.syscalls.sock.accepts++;
@@ -238,6 +319,8 @@
}
}
+ Must(sock >= 0);
+ newFd_ = sock;
details.peer = *gai;
if ( Config.client_ip_max_connections >= 0) {
@@ -248,15 +331,16 @@
}
}
+ // lookup the local-end details of this new connection
details.me.InitAddrInfo(gai);
-
details.me.SetEmpty();
getsockname(sock, gai->ai_addr, &gai->ai_addrlen);
details.me = *gai;
-
- commSetCloseOnExec(sock);
+ details.me.FreeAddrInfo(gai);
/* fdstat update */
+ // XXX : these are not all HTTP requests. use a note about type and ip:port details->
+ // so we end up with a uniform "(HTTP|FTP-data|HTTPS|...) remote-ip:remote-port"
fd_open(sock, FD_SOCKET, "HTTP Request");
fdd_table[sock].close_file = NULL;
@@ -265,15 +349,16 @@
fde *F = &fd_table[sock];
details.peer.NtoA(F->ipaddr,MAX_IPSTRLEN);
F->remote_port = details.peer.GetPort();
- F->local_addr.SetPort(details.me.GetPort());
+ F->local_addr = details.me;
F->sock_family = details.me.IsIPv6()?AF_INET6:AF_INET;
- details.me.FreeAddrInfo(gai);
+ // set socket flags
+ commSetCloseOnExec(sock);
commSetNonBlocking(sock);
/* IFF the socket is (tproxy) transparent, pass the flag down to allow spoofing */
F->flags.transparent = fd_table[fd].flags.transparent;
PROF_stop(comm_accept);
- return sock;
+ return COMM_OK;
}
=== renamed file 'src/comm/ListenStateData.h' => 'src/comm/TcpAcceptor.h'
--- src/comm/ListenStateData.h 2010-11-27 01:58:38 +0000
+++ src/comm/TcpAcceptor.h 2011-01-09 00:24:09 +0000
@@ -1,39 +1,89 @@
-#ifndef SQUID_LISTENERSTATEDATA_H
-#define SQUID_LISTENERSTATEDATA_H
+#ifndef SQUID_COMM_TCPACCEPTOR_H
+#define SQUID_COMM_TCPACCEPTOR_H
#include "base/AsyncCall.h"
-#include "comm.h"
+#include "base/Subscription.h"
+#include "CommCalls.h"
+#include "comm_err_t.h"
+#include "comm/TcpAcceptor.h"
+#include "ip/Address.h"
+
#if HAVE_MAP
#include <map>
#endif
-class ConnectionDetail;
-
namespace Comm
{
-class ListenStateData
+class AcceptLimiter;
+
+/**
+ * Listens on an FD for new incoming connections and
+ * emits an active FD descriptor for the new client.
+ *
+ * Handles all event limiting required to quash inbound connection
+ * floods within the global FD limits of available Squid_MaxFD and
+ * client_ip_max_connections.
+ *
+ * Fills the emitted connection with all connection details able to
+ * be looked up. Currently these are the local/remote IP:port details
+ * and the listening socket transparent-mode flag.
+ */
+class TcpAcceptor : public AsyncJob
{
+private:
+ virtual void start();
+ virtual bool doneAll() const;
+ virtual void swanSong();
public:
- ListenStateData(int fd, AsyncCall::Pointer &call, bool accept_many);
- ListenStateData(const ListenStateData &r); // not implemented.
- ~ListenStateData();
-
- void subscribe(AsyncCall::Pointer &call);
+ TcpAcceptor(const int listenFd, const Ip::Address &laddr, int flags,
+ const char *note, const Subscription::Pointer &aSub);
+
+ TcpAcceptor(const TcpAcceptor &r); // not implemented.
+
+ /** Subscribe a handler to receive calls back about new connections.
+ * Replaces any existing subscribed handler.
+ */
+ void subscribe(const Subscription::Pointer &aSub);
+
+ /** Remove the currently waiting callback subscription.
+ * Pending calls will remain scheduled.
+ */
+ void unsubscribe(const char *reason);
+
+ /** Try and accept another connection (synchronous).
+ * If one is pending already the subscribed callback handler will be scheduled
+ * to handle it before this method returns.
+ */
void acceptNext();
- void notify(int newfd, comm_err_t flag, const ConnectionDetail &details);
- int fd;
+ /// Call the subscribed callback handler with details about a new connection.
+ void notify(comm_err_t flags, const ConnectionDetail &newConnDetails);
/// errno code of the last accept() or listen() action if one occurred.
int errcode;
- /// whether this socket is delayed and on the AcceptLimiter queue.
- int32_t isLimited;
-
-private:
- /// Method to test if there are enough file escriptors to open a new client connection
+private:
+ friend class AcceptLimiter;
+ int32_t isLimited; ///< whether this socket is delayed and on the AcceptLimiter queue.
+ Subscription::Pointer theCallSub; ///< used to generate AsyncCalls handling our events.
+
+public:
+ /// conn being listened on for new connections
+ /// Reserved for read-only use.
+ // NP: public only until we can hide it behind connection handles
+ int fd;
+
+private:
+ /// IP Address and port being listened on
+ Ip::Address local_addr;
+
+ /// temporary holder for newely accepted client FD
+ int newFd_;
+
+private:
+ /// Method to test if there are enough file descriptors to open a new client connection
/// if not the accept() will be postponed
static bool okToAccept();
@@ -41,14 +91,12 @@
static void doAccept(int fd, void *data);
void acceptOne();
- int oldAccept(ConnectionDetail &details);
-
- AsyncCall::Pointer theCallback;
- bool mayAcceptMore;
-
+ comm_err_t oldAccept(ConnectionDetail &newConnDetails);
void setListen();
+
+ CBDATA_CLASS2(TcpAcceptor);
};
} // namespace Comm
-#endif /* SQUID_LISTENERSTATEDATA_H */
+#endif /* SQUID_COMM_TCPACCEPTOR_H */
=== modified file 'src/ftp.cc'
--- src/ftp.cc 2010-12-13 11:31:14 +0000
+++ src/ftp.cc 2011-01-10 00:42:23 +0000
@@ -34,8 +34,9 @@
#include "squid.h"
#include "comm.h"
+#include "CommCalls.h"
+#include "comm/TcpAcceptor.h"
#include "comm/Write.h"
-#include "comm/ListenStateData.h"
#include "compat/strtoll.h"
#include "ConnectionDetail.h"
#include "errorpage.h"
@@ -153,13 +154,11 @@
void clear(); /// just resets fd and close handler. does not close active connections.
- int fd; /// channel descriptor; \todo: remove because the closer has it
-
- /** Current listening socket handler. delete on shutdown or abort.
- * FTP stores a copy of the FD in the field fd above.
- * Use close() to properly close the channel.
- */
- Comm::ListenStateData *listener;
+ int fd; /// channel descriptor
+
+ Ip::Address local; ///< The local IP address:port this channel is using
+
+ int flags; ///< socket flags used when opening.
private:
AsyncCall::Pointer closer; /// Comm close handler callback
@@ -245,6 +244,12 @@
void completedListing(void);
void dataComplete();
void dataRead(const CommIoCbParams &io);
+
+ /// ignore timeout on CTRL channel. set read timeout on DATA channel.
+ void switchTimeoutToDataChannel();
+ /// create a data channel acceptor and start listening.
+ void listenForDataChannel(const int fd, const char *note);
+
int checkAuth(const HttpHeader * req_hdr);
void checkUrlpath();
void buildTitleUrl();
@@ -451,10 +456,10 @@
void
FtpStateData::dataClosed(const CommCloseCbParams &io)
{
- if (data.listener) {
- delete data.listener;
- data.listener = NULL;
- data.fd = -1;
+ debugs(9, 4, HERE);
+ if (data.fd >= 0) {
+ comm_close(data.fd);
+ // NP clear() does the: data.fd = -1;
}
data.clear();
failed(ERR_FTP_FAILURE, 0);
@@ -606,6 +611,28 @@
}
void
+FtpStateData::switchTimeoutToDataChannel()
+{
+ commSetTimeout(ctrl.fd, -1, NULL, NULL);
+
+ typedef CommCbMemFunT<FtpStateData, CommTimeoutCbParams> TimeoutDialer;
+ AsyncCall::Pointer timeoutCall = JobCallback(9, 5, TimeoutDialer, this, FtpStateData::ftpTimeout);
+ commSetTimeout(data.fd, Config.Timeout.read, timeoutCall);
+}
+
+void
+FtpStateData::listenForDataChannel(const int fd, const char *note)
+{
+ typedef CommCbMemFunT<FtpStateData, CommAcceptCbParams> AcceptDialer;
+ typedef AsyncCallT<AcceptDialer> AcceptCall;
+ RefCount<AcceptCall> call = static_cast<AcceptCall*>(JobCallback(11, 5, AcceptDialer, this, FtpStateData::ftpAcceptDataConnection));
+ Subscription::Pointer sub = new CallSubscription<AcceptCall>(call);
+ Comm::TcpAcceptor *tmp = new Comm::TcpAcceptor(fd, data.local, data.flags, note, sub);
+ data.fd = tmp->fd; // Ensure we have a copy of the FD opened for listening.
+ AsyncJob::Start(tmp);
+}
+
+void
FtpStateData::ftpTimeout(const CommTimeoutCbParams &io)
{
debugs(9, 4, "ftpTimeout: FD " << io.fd << ": '" << entry->url() << "'" );
@@ -1066,10 +1093,16 @@
usable = end - sbuf;
- debugs(9, 3, HERE << "usable = " << usable);
+ debugs(9, 3, HERE << "usable = " << usable << " of " << len << " bytes.");
if (usable == 0) {
- debugs(9, 3, HERE << "didn't find end for " << entry->url() );
+ if (buf[0] == '\0' && len == 1) {
+ debugs(9, 3, HERE << "NIL ends data from " << entry->url() << " transfer problem?");
+ data.readBuf->consume(len);
+ } else {
+ debugs(9, 3, HERE << "didn't find end for " << entry->url());
+ debugs(9, 3, HERE << "buffer remains (" << len << " bytes) '" << buf << "'");
+ }
xfree(sbuf);
return;
}
@@ -1673,7 +1706,7 @@
* establish one on the control socket.
*/
- if (data.fd > -1) {
+ if (data.fd >= 0) {
AsyncCall::Pointer nullCall = NULL;
commSetTimeout(data.fd, -1, nullCall);
}
@@ -2718,27 +2751,21 @@
static int
ftpOpenListenSocket(FtpStateData * ftpState, int fallback)
{
- int fd;
- Ip::Address addr;
struct addrinfo *AI = NULL;
- int on = 1;
int x = 0;
/// Close old data channels, if any. We may open a new one below.
- ftpState->data.close();
+ if (!(ftpState->data.flags & COMM_REUSEADDR))
+ ftpState->data.close();
/*
* Set up a listen socket on the same local address as the
* control connection.
*/
-
- addr.InitAddrInfo(AI);
-
+ ftpState->data.local.InitAddrInfo(AI);
x = getsockname(ftpState->ctrl.fd, AI->ai_addr, &AI->ai_addrlen);
-
- addr = *AI;
-
- addr.FreeAddrInfo(AI);
+ ftpState->data.local = *AI;
+ ftpState->data.local.FreeAddrInfo(AI);
if (x) {
debugs(9, DBG_CRITICAL, HERE << "getsockname(" << ftpState->ctrl.fd << ",..): " << xstrerror());
@@ -2750,38 +2777,19 @@
* used for both control and data.
*/
if (fallback) {
+ int on = 1;
setsockopt(ftpState->ctrl.fd, SOL_SOCKET, SO_REUSEADDR, (char *) &on, sizeof(on));
+ ftpState->ctrl.flags |= COMM_REUSEADDR;
+ ftpState->data.flags |= COMM_REUSEADDR;
+ ftpState->data.fd = ftpState->ctrl.fd;
} else {
/* if not running in fallback mode a new port needs to be retrieved */
- addr.SetPort(0);
- }
-
- fd = comm_open(SOCK_STREAM,
- IPPROTO_TCP,
- addr,
- COMM_NONBLOCKING | (fallback ? COMM_REUSEADDR : 0),
- ftpState->entry->url());
- debugs(9, 3, HERE << "Unconnected data socket created on FD " << fd );
-
- if (fd < 0) {
- debugs(9, DBG_CRITICAL, HERE << "comm_open failed");
- return -1;
- }
-
- typedef CommCbMemFunT<FtpStateData, CommAcceptCbParams> acceptDialer;
- AsyncCall::Pointer acceptCall = JobCallback(11, 5,
- acceptDialer, ftpState, FtpStateData::ftpAcceptDataConnection);
- ftpState->data.listener = new Comm::ListenStateData(fd, acceptCall, false);
-
- if (!ftpState->data.listener || ftpState->data.listener->errcode != 0) {
- comm_close(fd);
- return -1;
- }
-
- ftpState->data.opened(fd, ftpState->dataCloser());
- ftpState->data.port = comm_local_port(fd);
- ftpState->data.host = NULL;
- return fd;
+ ftpState->data.local.SetPort(0);
+ ftpState->data.flags = COMM_NONBLOCKING;
+ }
+
+ ftpState->listenForDataChannel(ftpState->data.fd, ftpState->entry->url());
+ return ftpState->data.fd;
}
/// \ingroup ServerProtocolFTPInternal
@@ -2870,6 +2878,7 @@
debugs(9, 3, HERE);
ftpState->flags.pasv_supported = 0;
fd = ftpOpenListenSocket(ftpState, 0);
+ debugs(9, 3, "Listening for FTP data connection with FD " << fd);
Ip::Address::InitAddrInfo(AI);
@@ -2922,77 +2931,68 @@
*/
void FtpStateData::ftpAcceptDataConnection(const CommAcceptCbParams &io)
{
- char ntoapeer[MAX_IPSTRLEN];
- debugs(9, 3, "ftpAcceptDataConnection");
-
- // one connection accepted. the handler has stopped listening. drop our local pointer to it.
- data.listener = NULL;
+ debugs(9, 3, HERE);
if (EBIT_TEST(entry->flags, ENTRY_ABORTED)) {
abortTransaction("entry aborted when accepting data conn");
return;
}
+ if (io.flag != COMM_OK) {
+ data.close();
+ debugs(9, DBG_IMPORTANT, "FTP AcceptDataConnection: FD " << io.fd << ": " << xstrerr(io.xerrno));
+ /** \todo Need to send error message on control channel*/
+ ftpFail(this);
+ return;
+ }
+
+ /* data listening conn is no longer even open. abort. */
+ if (data.fd <= 0 || fd_table[data.fd].flags.open == 0) {
+ data.clear(); // ensure that it's cleared and not just closed.
+ return;
+ }
+
/** \par
* When squid.conf ftp_sanitycheck is enabled, check the new connection is actually being
* made by the remote client which is connected to the FTP control socket.
+ * Or the one which we were told to listen for by control channel messages (may differ under NAT).
* This prevents third-party hacks, but also third-party load balancing handshakes.
*/
if (Config.Ftp.sanitycheck) {
+ char ntoapeer[MAX_IPSTRLEN];
io.details.peer.NtoA(ntoapeer,MAX_IPSTRLEN);
- if (strcmp(fd_table[ctrl.fd].ipaddr, ntoapeer) != 0) {
+ if (strcmp(fd_table[ctrl.fd].ipaddr, ntoapeer) != 0 &&
+ strcmp(fd_table[data.fd].ipaddr, ntoapeer) != 0) {
debugs(9, DBG_IMPORTANT,
"FTP data connection from unexpected server (" <<
io.details.peer << "), expecting " <<
- fd_table[ctrl.fd].ipaddr);
+ fd_table[ctrl.fd].ipaddr << " or " << fd_table[data.fd].ipaddr);
- /* close the bad soures connection down ASAP. */
+ /* close the bad sources connection down ASAP. */
comm_close(io.nfd);
- /* we are ony accepting once, so need to re-open the listener socket. */
- typedef CommCbMemFunT<FtpStateData, CommAcceptCbParams> acceptDialer;
- AsyncCall::Pointer acceptCall = JobCallback(11, 5,
- acceptDialer, this, FtpStateData::ftpAcceptDataConnection);
- data.listener = new Comm::ListenStateData(data.fd, acceptCall, false);
+ /* drop the bad connection (io) by ignoring the attempt. */
return;
}
}
- if (io.flag != COMM_OK) {
- debugs(9, DBG_IMPORTANT, "ftpHandleDataAccept: FD " << io.nfd << ": " << xstrerr(io.xerrno));
- /** \todo XXX Need to set error message */
- ftpFail(this);
- return;
- }
-
/**\par
- * Replace the Listen socket with the accepted data socket */
+ * Replace the Listening socket with the accepted data socket */
data.close();
data.opened(io.nfd, dataCloser());
data.port = io.details.peer.GetPort();
- io.details.peer.NtoA(data.host,SQUIDHOSTNAMELEN);
+ data.host = xstrdup(fd_table[io.nfd].ipaddr);
debugs(9, 3, "ftpAcceptDataConnection: Connected data socket on " <<
"FD " << io.nfd << " to " << io.details.peer << " FD table says: " <<
"ctrl-peer= " << fd_table[ctrl.fd].ipaddr << ", " <<
"data-peer= " << fd_table[data.fd].ipaddr);
-
- AsyncCall::Pointer nullCall = NULL;
- commSetTimeout(ctrl.fd, -1, nullCall);
-
- typedef CommCbMemFunT<FtpStateData, CommTimeoutCbParams> TimeoutDialer;
- AsyncCall::Pointer timeoutCall = JobCallback(9, 5,
- TimeoutDialer, this, FtpStateData::ftpTimeout);
- commSetTimeout(data.fd, Config.Timeout.read, timeoutCall);
-
- /*\todo XXX We should have a flag to track connect state...
- * host NULL -> not connected, port == local port
- * host set -> connected, port == remote port
- */
- /* Restart state (SENT_NLST/LIST/RETR) */
- FTP_SM_FUNCS[state] (this);
+ assert(haveControlChannel("ftpAcceptDataConnection"));
+ assert(ctrl.message == NULL);
+
+ // Ctrl channel operations will determine what happens to this data connection
}
/// \ingroup ServerProtocolFTPInternal
@@ -3087,11 +3087,7 @@
/*\par
* When client code is 150 with a hostname, Accept data channel. */
debugs(9, 3, "ftpReadStor: accepting data channel");
- typedef CommCbMemFunT<FtpStateData, CommAcceptCbParams> acceptDialer;
- AsyncCall::Pointer acceptCall = JobCallback(11, 5,
- acceptDialer, this, FtpStateData::ftpAcceptDataConnection);
-
- data.listener = new Comm::ListenStateData(data.fd, acceptCall, false);
+ listenForDataChannel(data.fd, data.host);
} else {
debugs(9, DBG_IMPORTANT, HERE << "Unexpected reply code "<< std::setfill('0') << std::setw(3) << code);
ftpFail(this);
@@ -3211,34 +3207,16 @@
if (code == 125 || (code == 150 && ftpState->data.host)) {
/* Begin data transfer */
- /* XXX what about Config.Timeout.read? */
+ debugs(9, 3, HERE << "begin data transfer from " << ftpState->data.host << " (" << ftpState->data.local << ")");
+ ftpState->switchTimeoutToDataChannel();
ftpState->maybeReadVirginBody();
ftpState->state = READING_DATA;
- /*
- * Cancel the timeout on the Control socket and establish one
- * on the data socket
- */
- AsyncCall::Pointer nullCall = NULL;
- commSetTimeout(ftpState->ctrl.fd, -1, nullCall);
return;
} else if (code == 150) {
/* Accept data channel */
- typedef CommCbMemFunT<FtpStateData, CommAcceptCbParams> acceptDialer;
- AsyncCall::Pointer acceptCall = JobCallback(11, 5,
- acceptDialer, ftpState, FtpStateData::ftpAcceptDataConnection);
-
- ftpState->data.listener = new Comm::ListenStateData(ftpState->data.fd, acceptCall, false);
- /*
- * Cancel the timeout on the Control socket and establish one
- * on the data socket
- */
- AsyncCall::Pointer nullCall = NULL;
- commSetTimeout(ftpState->ctrl.fd, -1, nullCall);
-
- typedef CommCbMemFunT<FtpStateData, CommTimeoutCbParams> TimeoutDialer;
- AsyncCall::Pointer timeoutCall = JobCallback(9, 5,
- TimeoutDialer, ftpState,FtpStateData::ftpTimeout);
- commSetTimeout(ftpState->data.fd, Config.Timeout.read, timeoutCall);
+ debugs(9, 3, HERE << "accept data channel from " << ftpState->data.host << " (" << ftpState->data.local << ")");
+ ftpState->switchTimeoutToDataChannel();
+ ftpState->listenForDataChannel(ftpState->data.fd, ftpState->data.host);
return;
} else if (!ftpState->flags.tried_nlst && code > 300) {
ftpSendNlst(ftpState);
@@ -3274,32 +3252,13 @@
if (code == 125 || (code == 150 && ftpState->data.host)) {
/* Begin data transfer */
debugs(9, 3, HERE << "reading data channel");
- /* XXX what about Config.Timeout.read? */
+ ftpState->switchTimeoutToDataChannel();
ftpState->maybeReadVirginBody();
ftpState->state = READING_DATA;
- /*
- * Cancel the timeout on the Control socket and establish one
- * on the data socket
- */
- AsyncCall::Pointer nullCall = NULL;
- commSetTimeout(ftpState->ctrl.fd, -1, nullCall);
} else if (code == 150) {
/* Accept data channel */
- typedef CommCbMemFunT<FtpStateData, CommAcceptCbParams> acceptDialer;
- AsyncCall::Pointer acceptCall = JobCallback(11, 5,
- acceptDialer, ftpState, FtpStateData::ftpAcceptDataConnection);
- ftpState->data.listener = new Comm::ListenStateData(ftpState->data.fd, acceptCall, false);
- /*
- * Cancel the timeout on the Control socket and establish one
- * on the data socket
- */
- AsyncCall::Pointer nullCall = NULL;
- commSetTimeout(ftpState->ctrl.fd, -1, nullCall);
-
- typedef CommCbMemFunT<FtpStateData, CommTimeoutCbParams> TimeoutDialer;
- AsyncCall::Pointer timeoutCall = JobCallback(9, 5,
- TimeoutDialer, ftpState,FtpStateData::ftpTimeout);
- commSetTimeout(ftpState->data.fd, Config.Timeout.read, timeoutCall);
+ ftpState->switchTimeoutToDataChannel();
+ ftpState->listenForDataChannel(ftpState->data.fd, ftpState->data.host);
} else if (code >= 300) {
if (!ftpState->flags.try_slash_hack) {
/* Try this as a directory missing trailing slash... */
@@ -3952,6 +3911,13 @@
fd = aFd;
closer = aCloser;
comm_add_close_handler(fd, closer);
+
+ // grab the local IP address:port details for this connection
+ struct addrinfo *AI = NULL;
+ local.InitAddrInfo(AI);
+ getsockname(aFd, AI->ai_addr, &AI->ai_addrlen);
+ local = *AI;
+ local.FreeAddrInfo(AI);
}
/// planned close: removes the close handler and calls comm_close
@@ -3959,15 +3925,11 @@
FtpChannel::close()
{
// channels with active listeners will be closed when the listener handler dies.
- if (listener) {
- delete listener;
- listener = NULL;
- comm_remove_close_handler(fd, closer);
- closer = NULL;
- fd = -1;
- } else if (fd >= 0) {
- comm_remove_close_handler(fd, closer);
- closer = NULL;
+ if (fd >= 0) {
+ if (closer != NULL) {
+ comm_remove_close_handler(fd, closer);
+ closer = NULL;
+ }
comm_close(fd); // we do not expect to be called back
fd = -1;
}
=== modified file 'src/htcp.cc'
--- src/htcp.cc 2010-12-13 11:31:14 +0000
+++ src/htcp.cc 2011-01-09 08:28:49 +0000
@@ -1516,12 +1516,13 @@
AsyncCall::Pointer call = asyncCall(31, 2,
"htcpIncomingConnectionOpened",
HtcpListeningStartedDialer(&htcpIncomingConnectionOpened));
+ Subscription::Pointer nilSub;
Ipc::StartListening(SOCK_DGRAM,
IPPROTO_UDP,
incomingAddr,
COMM_NONBLOCKING,
- Ipc::fdnInHtcpSocket, call);
+ Ipc::fdnInHtcpSocket, call, nilSub);
if (!Config.Addrs.udp_outgoing.IsNoAddr()) {
Ip::Address outgoingAddr = Config.Addrs.udp_outgoing;
=== modified file 'src/icp_v2.cc'
--- src/icp_v2.cc 2010-12-13 11:31:14 +0000
+++ src/icp_v2.cc 2011-01-09 08:30:19 +0000
@@ -698,11 +698,13 @@
"icpIncomingConnectionOpened",
IcpListeningStartedDialer(&icpIncomingConnectionOpened, addr));
+ Subscription::Pointer nilSub;
+
Ipc::StartListening(SOCK_DGRAM,
IPPROTO_UDP,
addr,
COMM_NONBLOCKING,
- Ipc::fdnInIcpSocket, call);
+ Ipc::fdnInIcpSocket, call, nilSub);
addr.SetEmpty(); // clear for next use.
addr = Config.Addrs.udp_outgoing;
=== modified file 'src/ipc/SharedListen.cc'
--- src/ipc/SharedListen.cc 2010-10-28 18:52:59 +0000
+++ src/ipc/SharedListen.cc 2011-01-09 00:21:05 +0000
@@ -150,5 +150,6 @@
Must(cbd);
cbd->fd = fd;
cbd->errNo = response.errNo;
+ cbd->handlerSubscription = por.params.handlerSubscription;
ScheduleCallHere(por.callback);
}
=== modified file 'src/ipc/SharedListen.h'
--- src/ipc/SharedListen.h 2010-07-06 23:09:44 +0000
+++ src/ipc/SharedListen.h 2011-01-09 00:18:50 +0000
@@ -9,6 +9,7 @@
#define SQUID_IPC_SHARED_LISTEN_H
#include "base/AsyncCall.h"
+#include "base/Subscription.h"
namespace Ipc
{
@@ -28,6 +29,9 @@
Ip::Address addr; ///< will be memset and memcopied
int flags;
int fdNote; ///< index into fd_note() comment strings
+
+ /// handler to subscribe to Comm::TcpAcceptor
+ Subscription::Pointer handlerSubscription;
};
class TypedMsgHdr;
=== modified file 'src/ipc/StartListening.cc'
--- src/ipc/StartListening.cc 2010-07-06 23:09:44 +0000
+++ src/ipc/StartListening.cc 2011-01-10 00:52:21 +0000
@@ -6,8 +6,9 @@
*/
#include "config.h"
+#include "base/TextException.h"
#include "comm.h"
-#include "base/TextException.h"
+#include "comm/TcpAcceptor.h"
#include "ipc/SharedListen.h"
#include "ipc/StartListening.h"
@@ -25,34 +26,41 @@
return os << "(FD " << fd << ", err=" << errNo;
}
-
-void Ipc::StartListening(int sock_type, int proto, Ip::Address &addr,
- int flags, FdNoteId fdNote, AsyncCall::Pointer &callback)
+void
+Ipc::StartListening(int sock_type, int proto, Ip::Address &addr, int flags,
+ FdNoteId fdNote, AsyncCall::Pointer &callback, const Subscription::Pointer &sub)
{
- OpenListenerParams p;
- p.sock_type = sock_type;
- p.proto = proto;
- p.addr = addr;
- p.flags = flags;
- p.fdNote = fdNote;
-
- if (UsingSmp()) { // if SMP is on, share
+ if (UsingSmp()) { // if SMP is on, share
+ OpenListenerParams p;
+ p.sock_type = sock_type;
+ p.proto = proto;
+ p.addr = addr;
+ p.flags = flags;
+ p.fdNote = fdNote;
+ p.handlerSubscription = sub;
Ipc::JoinSharedListen(p, callback);
return; // wait for the call back
}
+ StartListeningCb *cbd = dynamic_cast<StartListeningCb*>(callback->getDialer());
+ Must(cbd);
+
enter_suid();
- const int sock = comm_open_listener(p.sock_type, p.proto, p.addr, p.flags,
- FdNote(p.fdNote));
- const int errNo = (sock >= 0) ? 0 : errno;
+ if (sock_type == SOCK_STREAM) {
+ // TCP: setup a job to handle accept() with subscribed handler
+ Comm::TcpAcceptor *tmp = new Comm::TcpAcceptor(cbd->fd, addr, flags, FdNote(fdNote), sub);
+ cbd->fd = tmp->fd;
+ AsyncJob::Start(tmp);
+ } else if (sock_type == SOCK_DGRAM) {
+ // UDP: setup the listener socket, but do not set a subscriber
+ // TODO: create a UDP sbscription so packet event calls get scheduled and queued Async.
+ cbd->fd = comm_open_listener(sock_type, proto, addr, flags, FdNote(fdNote));
+ } else {
+ fatalf("Invalid Socket Type (%d)",sock_type);
+ }
+ cbd->errNo = cbd->fd >= 0 ? 0 : errno;
leave_suid();
- debugs(54, 3, HERE << "opened listen FD " << sock << " for " << p.addr);
-
- StartListeningCb *cbd =
- dynamic_cast<StartListeningCb*>(callback->getDialer());
- Must(cbd);
- cbd->fd = sock;
- cbd->errNo = errNo;
+ debugs(54, 3, HERE << "opened listen FD " << cbd->fd << " on " << addr);
ScheduleCallHere(callback);
}
=== modified file 'src/ipc/StartListening.h'
--- src/ipc/StartListening.h 2010-11-21 04:40:05 +0000
+++ src/ipc/StartListening.h 2011-01-09 00:30:35 +0000
@@ -11,6 +11,7 @@
#include "ip/forward.h"
#include "ipc/FdNotes.h"
#include "base/AsyncCall.h"
+#include "base/Subscription.h"
#if HAVE_IOSFWD
#include <iosfwd>
@@ -32,12 +33,15 @@
public:
int fd; ///< opened listening socket or -1
int errNo; ///< errno value from the comm_open_listener() call
+
+ /// The subscription we will pass on to the Comm::TcpAcceptor
+ Subscription::Pointer handlerSubscription;
};
/// Depending on whether SMP is on, either ask Coordinator to send us
/// the listening FD or call comm_open_listener() directly.
-extern void StartListening(int sock_type, int proto, Ip::Address &addr,
- int flags, FdNoteId fdNote, AsyncCall::Pointer &callback);
+extern void StartListening(int sock_type, int proto, Ip::Address &addr, int flags,
+ FdNoteId fdNote, AsyncCall::Pointer &callback, const Subscription::Pointer &sub);
} // namespace Ipc;
=== modified file 'src/snmp_core.cc'
--- src/snmp_core.cc 2010-12-13 11:31:14 +0000
+++ src/snmp_core.cc 2011-01-09 08:32:43 +0000
@@ -318,12 +318,13 @@
AsyncCall::Pointer call = asyncCall(49, 2,
"snmpIncomingConnectionOpened",
SnmpListeningStartedDialer(&snmpIncomingConnectionOpened));
+ Subscription::Pointer nilSub;
Ipc::StartListening(SOCK_DGRAM,
IPPROTO_UDP,
Config.Addrs.snmp_incoming,
COMM_NONBLOCKING,
- Ipc::fdnInSnmpSocket, call);
+ Ipc::fdnInSnmpSocket, call, nilSub);
if (!Config.Addrs.snmp_outgoing.IsNoAddr()) {
Config.Addrs.snmp_outgoing.SetPort(Config.Port.snmp);
@@ -339,12 +340,13 @@
AsyncCall::Pointer call = asyncCall(49, 2,
"snmpOutgoingConnectionOpened",
SnmpListeningStartedDialer(&snmpOutgoingConnectionOpened));
+ Subscription::Pointer nilSub;
Ipc::StartListening(SOCK_DGRAM,
IPPROTO_UDP,
Config.Addrs.snmp_outgoing,
COMM_NONBLOCKING,
- Ipc::fdnOutSnmpSocket, call);
+ Ipc::fdnOutSnmpSocket, call, nilSub);
}
}
}