This patch adds a Comm::Write API for accepting SBuf output buffers.
Unlike the MemBuf API it does not deallocate the backing store once written.
Unlike the char* API the buffer can be appended to before write(2) is
completed and the appended bytes will be handled as part of the original
write request.
All other behaviour is identical between the three Comm::Write APIs.
To get it in action and tested the main tunnel.cc I/O
"data pipe" buffers are converted to SBuf. Initial testing with a few
hundred CONNECT requests in real traffic shows no issues. We even get
rid of one potential data copy when passing the initial client read
buffer in from ConnStateData.
NP: at this point no effort is made to take advantage of the
early-append possibility.
The buffers behind the setup request/response handling are also left as
MemBuf due to HttpMsg::parse() and packerToMemInit() APIs missing SBuf
support.
Amos
=== modified file 'src/comm/Write.cc'
--- src/comm/Write.cc 2014-02-21 10:46:19 +0000
+++ src/comm/Write.cc 2014-05-11 15:03:02 +0000
@@ -1,118 +1,145 @@
#include "squid.h"
#include "comm/Connection.h"
#include "comm/IoCallback.h"
#include "comm/Write.h"
#include "fd.h"
#include "fde.h"
#include "globals.h"
#include "MemBuf.h"
#include "profiler/Profiler.h"
+#include "SBuf.h"
#include "SquidTime.h"
#include "StatCounters.h"
#if USE_DELAY_POOLS
#include "ClientInfo.h"
#endif
#include <cerrno>
void
+Comm::Write(const Comm::ConnectionPointer &conn, SBuf &sb, AsyncCall::Pointer
&callback)
+{
+ debugs(5, 5, conn << ": sz " << sb.length() << ": asynCall " << callback);
+
+ /* Make sure we are open, not closing, and not writing */
+ assert(fd_table[conn->fd].flags.open);
+ assert(!fd_table[conn->fd].closing());
+ Comm::IoCallback *ccb = COMMIO_FD_WRITECB(conn->fd);
+ assert(!ccb->active());
+
+ fd_table[conn->fd].writeStart = squid_curtime;
+ ccb->conn = conn;
+ /* Queue the write */
+ ccb->setCallback(IOCB_WRITE, callback, NULL, NULL, sb.length());
+ ccb->buf2 = &sb;
+ ccb->selectOrQueueWrite();
+}
+
+/// \deprecated use SBuf for I/O buffer instead
+void
Comm::Write(const Comm::ConnectionPointer &conn, MemBuf *mb,
AsyncCall::Pointer &callback)
{
Comm::Write(conn, mb->buf, mb->size, callback, mb->freeFunc());
}
+/// \deprecated use SBuf for I/O buffer instead
void
Comm::Write(const Comm::ConnectionPointer &conn, const char *buf, int size,
AsyncCall::Pointer &callback, FREE * free_func)
{
debugs(5, 5, HERE << conn << ": sz " << size << ": asynCall " << callback);
/* Make sure we are open, not closing, and not writing */
assert(fd_table[conn->fd].flags.open);
assert(!fd_table[conn->fd].closing());
Comm::IoCallback *ccb = COMMIO_FD_WRITECB(conn->fd);
assert(!ccb->active());
fd_table[conn->fd].writeStart = squid_curtime;
ccb->conn = conn;
/* Queue the write */
ccb->setCallback(IOCB_WRITE, callback, (char *)buf, free_func, size);
ccb->selectOrQueueWrite();
}
/** Write to FD.
* This function is used by the lowest level of IO loop which only has access
to FD numbers.
* We have to use the comm iocb_table to map FD numbers to waiting data and
Comm::Connections.
* Once the write has been concluded we schedule the waiting call with
success/fail results.
*/
void
Comm::HandleWrite(int fd, void *data)
{
Comm::IoCallback *state = static_cast<Comm::IoCallback *>(data);
int len = 0;
int nleft;
assert(state->conn != NULL && state->conn->fd == fd);
PROF_start(commHandleWrite);
- debugs(5, 5, HERE << state->conn << ": off " <<
- (long int) state->offset << ", sz " << (long int) state->size <<
".");
- nleft = state->size - state->offset;
+ debugs(5, 5, state->conn << ": off " << state->offset << ", sz " <<
state->size);
+ if (state->buf2)
+ nleft = state->buf2->length();
+ else
+ nleft = state->size - state->offset;
#if USE_DELAY_POOLS
ClientInfo * clientInfo=fd_table[fd].clientInfo;
if (clientInfo && !clientInfo->writeLimitingActive)
clientInfo = NULL; // we only care about quota limits here
if (clientInfo) {
assert(clientInfo->selectWaiting);
clientInfo->selectWaiting = false;
assert(clientInfo->hasQueue());
assert(clientInfo->quotaPeekFd() == fd);
clientInfo->quotaDequeue(); // we will write or requeue below
if (nleft > 0) {
const int quota = clientInfo->quotaForDequed();
if (!quota) { // if no write quota left, queue this fd
state->quotaQueueReserv = clientInfo->quotaEnqueue(fd);
clientInfo->kickQuotaQueue();
PROF_stop(commHandleWrite);
return;
}
const int nleft_corrected = min(nleft, quota);
if (nleft != nleft_corrected) {
debugs(5, 5, HERE << state->conn << " writes only " <<
nleft_corrected << " out of " << nleft);
nleft = nleft_corrected;
}
}
}
#endif /* USE_DELAY_POOLS */
/* actually WRITE data */
- len = FD_WRITE_METHOD(fd, state->buf + state->offset, nleft);
- debugs(5, 5, HERE << "write() returns " << len);
+ if (state->buf2)
+ len = FD_WRITE_METHOD(fd, state->buf2->rawContent(), nleft);
+ else
+ len = FD_WRITE_METHOD(fd, state->buf + state->offset, nleft);
+ debugs(5, 5, "write() returns " << len);
#if USE_DELAY_POOLS
if (clientInfo) {
if (len > 0) {
/* we wrote data - drain them from bucket */
clientInfo->bucketSize -= len;
if (clientInfo->bucketSize < 0.0) {
debugs(5, DBG_IMPORTANT, HERE << "drained too much"); //
should not happen
clientInfo->bucketSize = 0;
}
}
// even if we wrote nothing, we were served; give others a chance
clientInfo->kickQuotaQueue();
}
#endif /* USE_DELAY_POOLS */
fd_bytes(fd, len, FD_WRITE);
++statCounter.syscalls.sock.writes;
// After each successful partial write,
@@ -124,31 +151,33 @@
/* We're done */
if (nleft != 0)
debugs(5, DBG_IMPORTANT, "FD " << fd << " write failure:
connection closed with " << nleft << " bytes remaining.");
state->finish(nleft ? COMM_ERROR : COMM_OK, errno);
} else if (len < 0) {
/* An error */
if (fd_table[fd].flags.socket_eof) {
debugs(50, 2, HERE << "FD " << fd << " write failure: " <<
xstrerror() << ".");
state->finish(nleft ? COMM_ERROR : COMM_OK, errno);
} else if (ignoreErrno(errno)) {
debugs(50, 9, HERE << "FD " << fd << " write failure: " <<
xstrerror() << ".");
state->selectOrQueueWrite();
} else {
debugs(50, 2, HERE << "FD " << fd << " write failure: " <<
xstrerror() << ".");
state->finish(nleft ? COMM_ERROR : COMM_OK, errno);
}
} else {
/* A successful write, continue */
state->offset += len;
+ if (state->buf2)
+ state->buf2->consume(len);
if (state->offset < state->size) {
/* Not done, reinstall the write handler and write some more */
state->selectOrQueueWrite();
} else {
state->finish(nleft ? COMM_OK : COMM_ERROR, errno);
}
}
PROF_stop(commHandleWrite);
}
=== modified file 'src/comm/Write.h'
--- src/comm/Write.h 2012-08-14 11:53:07 +0000
+++ src/comm/Write.h 2014-05-11 14:47:10 +0000
@@ -1,34 +1,44 @@
#ifndef _SQUID_COMM_IOWRITE_H
#define _SQUID_COMM_IOWRITE_H
#include "base/AsyncCall.h"
#include "comm/forward.h"
#include "typedefs.h"
class MemBuf;
+class SBuf;
+
namespace Comm
{
/**
* Queue a write. callback is scheduled when the write
* completes, on error, or on file descriptor close.
*
* free_func is used to free the passed buffer when the write has completed.
*/
void Write(const Comm::ConnectionPointer &conn, const char *buf, int size,
AsyncCall::Pointer &callback, FREE *free_func);
/**
* Queue a write. callback is scheduled when the write
* completes, on error, or on file descriptor close.
*/
void Write(const Comm::ConnectionPointer &conn, MemBuf *mb, AsyncCall::Pointer
&callback);
+/**
+ * Queue a write for an SBuf contents. The SBuf content is consume()'d as it
is written.
+ * callback is scheduled when the SBuf is emptied, on error, or on file
descriptor close.
+ * If the SBuf is added to while write(2) are ongoing the additional bytes
will also be
+ * written before the callback is scheduled.
+ */
+void Write(const Comm::ConnectionPointer &conn, SBuf &sb, AsyncCall::Pointer
&callback);
+
/// Cancel the write pending on FD. No action if none pending.
void WriteCancel(const Comm::ConnectionPointer &conn, const char *reason);
// callback handler to process an FD which is available for writing.
extern PF HandleWrite;
} // namespace Comm
#endif /* _SQUID_COMM_IOWRITE_H */
=== modified file 'src/tunnel.cc'
--- src/tunnel.cc 2014-05-07 14:40:05 +0000
+++ src/tunnel.cc 2014-05-11 16:56:00 +0000
@@ -69,108 +69,96 @@
* then shuffling binary data between the resulting FD pair.
*/
/*
* TODO 1: implement a read/write API on ConnStateData to send/receive blocks
* of pre-formatted data. Then we can use that as the client side of the tunnel
* instead of re-implementing it here and occasionally getting the
ConnStateData
* read/write state wrong.
*
* TODO 2: then convert this into a AsyncJob, possibly a child of 'Server'
*/
class TunnelStateData
{
public:
TunnelStateData();
~TunnelStateData();
TunnelStateData(const TunnelStateData &); // do not implement
TunnelStateData &operator =(const TunnelStateData &); // do not implement
class Connection;
- static void ReadClient(const Comm::ConnectionPointer &, char *buf, size_t
len, comm_err_t errcode, int xerrno, void *data);
- static void ReadServer(const Comm::ConnectionPointer &, char *buf, size_t
len, comm_err_t errcode, int xerrno, void *data);
- static void WriteClientDone(const Comm::ConnectionPointer &, char *buf,
size_t len, comm_err_t flag, int xerrno, void *data);
- static void WriteServerDone(const Comm::ConnectionPointer &, char *buf,
size_t len, comm_err_t flag, int xerrno, void *data);
+ static void ReadClient(const Comm::ConnectionPointer &, char *, size_t
len, comm_err_t errcode, int xerrno, void *data);
+ static void ReadServer(const Comm::ConnectionPointer &, char *, size_t
len, comm_err_t errcode, int xerrno, void *data);
+ static void WriteClientDone(const Comm::ConnectionPointer &, char *,
size_t len, comm_err_t flag, int xerrno, void *data);
+ static void WriteServerDone(const Comm::ConnectionPointer &, char *,
size_t len, comm_err_t flag, int xerrno, void *data);
/// Starts reading peer response to our CONNECT request.
void readConnectResponse();
/// Called when we may be done handling a CONNECT exchange with the peer.
void connectExchangeCheckpoint();
bool noConnections() const;
char *url;
CbcPointer<ClientHttpRequest> http;
HttpRequest::Pointer request;
AccessLogEntryPointer al;
Comm::ConnectionList serverDestinations;
const char * getHost() const {
return (server.conn != NULL && server.conn->getPeer() ?
server.conn->getPeer()->host : request->GetHost());
};
/// Whether we are writing a CONNECT request to a peer.
bool waitingForConnectRequest() const { return connectReqWriting; }
/// Whether we are reading a CONNECT response from a peer.
bool waitingForConnectResponse() const { return connectRespBuf; }
/// Whether we are waiting for the CONNECT request/response exchange with
the peer.
bool waitingForConnectExchange() const { return waitingForConnectRequest()
|| waitingForConnectResponse(); }
/// Whether the client sent a CONNECT request to us.
bool clientExpectsConnectResponse() const {
return !(request != NULL &&
(request->flags.interceptTproxy ||
request->flags.intercepted));
}
class Connection
{
public:
- Connection() : len (0), buf ((char *)xmalloc(SQUID_TCP_SO_RCVBUF)),
size_ptr(NULL) {}
-
- ~Connection();
+ Connection() : size_ptr(NULL) {}
int bytesWanted(int lower=0, int upper = INT_MAX) const;
- void bytesIn(int const &);
-#if USE_DELAY_POOLS
-
- void setDelayId(DelayId const &);
-#endif
-
void error(int const xerrno);
int debugLevelForError(int const xerrno) const;
/// handles a non-I/O error associated with this Connection
void logicError(const char *errMsg);
void closeIfOpen();
void dataSent (size_t amount);
- int len;
- char *buf;
+ SBuf buf;
int64_t *size_ptr; /* pointer to size in an ConnStateData
for logging */
Comm::ConnectionPointer conn; ///< The currently connected
connection.
- private:
#if USE_DELAY_POOLS
-
DelayId delayId;
#endif
-
};
Connection client, server;
int *status_ptr; /* pointer to status for logging */
MemBuf *connectRespBuf; ///< accumulates peer CONNECT response when we
need it
bool connectReqWriting; ///< whether we are writing a CONNECT request to a
peer
void copyRead(Connection &from, IOCB *completion);
/// continue to set up connection to a peer, going async for SSL peers
void connectToPeer();
private:
#if USE_OPENSSL
/// Gives PeerConnector access to Answer in the TunnelStateData callback
dialer.
class MyAnswerDialer: public CallDialer, public
Ssl::PeerConnector::CbDialer
{
public:
typedef void (TunnelStateData::*Method)(Ssl::PeerConnectorAnswer &);
@@ -181,208 +169,196 @@
virtual bool canDial(AsyncCall &call) { return tunnel_.valid(); }
void dial(AsyncCall &call) { ((&(*tunnel_))->*method_)(answer_); }
virtual void print(std::ostream &os) const {
os << '(' << tunnel_.get() << ", " << answer_ << ')';
}
/* Ssl::PeerConnector::CbDialer API */
virtual Ssl::PeerConnectorAnswer &answer() { return answer_; }
private:
Method method_;
CbcPointer<TunnelStateData> tunnel_;
Ssl::PeerConnectorAnswer answer_;
};
void connectedToPeer(Ssl::PeerConnectorAnswer &answer);
#endif
CBDATA_CLASS2(TunnelStateData);
bool keepGoingAfterRead(size_t len, comm_err_t errcode, int xerrno,
Connection &from, Connection &to);
- void copy(size_t len, Connection &from, Connection &to, IOCB *);
+ void copy(Connection &from, Connection &to, IOCB *);
void handleConnectResponse(const size_t chunkSize);
- void readServer(char *buf, size_t len, comm_err_t errcode, int xerrno);
- void readClient(char *buf, size_t len, comm_err_t errcode, int xerrno);
- void writeClientDone(char *buf, size_t len, comm_err_t flag, int xerrno);
- void writeServerDone(char *buf, size_t len, comm_err_t flag, int xerrno);
+ void readServer(size_t len, comm_err_t errcode, int xerrno);
+ void readClient(size_t len, comm_err_t errcode, int xerrno);
+ void writeClientDone(size_t len, comm_err_t flag, int xerrno);
+ void writeServerDone(size_t len, comm_err_t flag, int xerrno);
static void ReadConnectResponseDone(const Comm::ConnectionPointer &, char
*buf, size_t len, comm_err_t errcode, int xerrno, void *data);
void readConnectResponseDone(char *buf, size_t len, comm_err_t errcode,
int xerrno);
};
static const char *const conn_established = "HTTP/1.1 200 Connection
established\r\n\r\n";
static CNCB tunnelConnectDone;
static ERCB tunnelErrorComplete;
static CLCB tunnelServerClosed;
static CLCB tunnelClientClosed;
static CTCB tunnelTimeout;
static PSC tunnelPeerSelectComplete;
static void tunnelConnected(const Comm::ConnectionPointer &server, void *);
static void tunnelRelayConnectRequest(const Comm::ConnectionPointer &server,
void *);
static void
tunnelServerClosed(const CommCloseCbParams ¶ms)
{
TunnelStateData *tunnelState = (TunnelStateData *)params.data;
debugs(26, 3, HERE << tunnelState->server.conn);
tunnelState->server.conn = NULL;
if (tunnelState->noConnections()) {
delete tunnelState;
return;
}
- if (!tunnelState->server.len) {
+ if (tunnelState->server.buf.isEmpty()) {
tunnelState->client.conn->close();
return;
}
}
static void
tunnelClientClosed(const CommCloseCbParams ¶ms)
{
TunnelStateData *tunnelState = (TunnelStateData *)params.data;
debugs(26, 3, HERE << tunnelState->client.conn);
tunnelState->client.conn = NULL;
if (tunnelState->noConnections()) {
delete tunnelState;
return;
}
- if (!tunnelState->client.len) {
+ if (tunnelState->client.buf.isEmpty()) {
tunnelState->server.conn->close();
return;
}
}
TunnelStateData::TunnelStateData() :
url(NULL),
http(),
request(NULL),
status_ptr(NULL),
connectRespBuf(NULL),
connectReqWriting(false)
{
debugs(26, 3, "TunnelStateData constructed this=" << this);
}
TunnelStateData::~TunnelStateData()
{
debugs(26, 3, "TunnelStateData destructed this=" << this);
assert(noConnections());
xfree(url);
serverDestinations.clear();
delete connectRespBuf;
}
-TunnelStateData::Connection::~Connection()
-{
- safe_free(buf);
-}
-
int
TunnelStateData::Connection::bytesWanted(int lowerbound, int upperbound) const
{
#if USE_DELAY_POOLS
return delayId.bytesWanted(lowerbound, upperbound);
#else
return upperbound;
#endif
}
-void
-TunnelStateData::Connection::bytesIn(int const &count)
-{
- debugs(26, 3, HERE << "len=" << len << " + count=" << count);
-#if USE_DELAY_POOLS
- delayId.bytesIn(count);
-#endif
-
- len += count;
-}
-
int
TunnelStateData::Connection::debugLevelForError(int const xerrno) const
{
#ifdef ECONNRESET
if (xerrno == ECONNRESET)
return 2;
#endif
if (ignoreErrno(xerrno))
return 3;
return 1;
}
/* Read from server side and queue it for writing to the client */
void
-TunnelStateData::ReadServer(const Comm::ConnectionPointer &c, char *buf,
size_t len, comm_err_t errcode, int xerrno, void *data)
+TunnelStateData::ReadServer(const Comm::ConnectionPointer &c, char *, size_t
len, comm_err_t errcode, int xerrno, void *data)
{
TunnelStateData *tunnelState = (TunnelStateData *)data;
assert(cbdataReferenceValid(tunnelState));
debugs(26, 3, HERE << c);
- tunnelState->readServer(buf, len, errcode, xerrno);
+ tunnelState->readServer(len, errcode, xerrno);
}
void
-TunnelStateData::readServer(char *buf, size_t len, comm_err_t errcode, int
xerrno)
+TunnelStateData::readServer(size_t len, comm_err_t errcode, int xerrno)
{
debugs(26, 3, HERE << server.conn << ", read " << len << " bytes, err=" <<
errcode);
/*
* Bail out early on COMM_ERR_CLOSING
* - close handlers will tidy up for us
*/
if (errcode == COMM_ERR_CLOSING)
return;
if (len > 0) {
- server.bytesIn(len);
+#if USE_DELAY_POOLS
+ server.delayId.bytesIn(len);
+#endif
kb_incr(&(statCounter.server.all.kbytes_in), len);
kb_incr(&(statCounter.server.other.kbytes_in), len);
}
if (keepGoingAfterRead(len, errcode, xerrno, server, client))
- copy(len, server, client, WriteClientDone);
+ copy(server, client, WriteClientDone);
}
/// Called when we read [a part of] CONNECT response from the peer
void
TunnelStateData::readConnectResponseDone(char *buf, size_t len, comm_err_t
errcode, int xerrno)
{
debugs(26, 3, server.conn << ", read " << len << " bytes, err=" <<
errcode);
assert(waitingForConnectResponse());
if (errcode == COMM_ERR_CLOSING)
return;
if (len > 0) {
connectRespBuf->appended(len);
- server.bytesIn(len);
+#if USE_DELAY_POOLS
+ server.delayId.bytesIn(len);
+#endif
kb_incr(&(statCounter.server.all.kbytes_in), len);
kb_incr(&(statCounter.server.other.kbytes_in), len);
}
if (keepGoingAfterRead(len, errcode, xerrno, server, client))
handleConnectResponse(len);
}
/* Read from client side and queue it for writing to the server */
void
TunnelStateData::ReadConnectResponseDone(const Comm::ConnectionPointer &, char
*buf, size_t len, comm_err_t errcode, int xerrno, void *data)
{
TunnelStateData *tunnelState = (TunnelStateData *)data;
assert (cbdataReferenceValid (tunnelState));
tunnelState->readConnectResponseDone(buf, len, errcode, xerrno);
}
/// Parses [possibly incomplete] CONNECT response and reacts to it.
/// If the tunnel is being closed or more response data is needed, returns
false.
@@ -417,230 +393,226 @@
server.logicError("huge CONNECT response from peer");
return;
}
// keep reading
readConnectResponse();
return;
}
// CONNECT response was successfully parsed
*status_ptr = rep.sline.status();
// bail if we did not get an HTTP 200 (Connection Established) response
if (rep.sline.status() != Http::scOkay) {
server.logicError("unsupported CONNECT response status code");
return;
}
if (rep.hdr_sz < connectRespBuf->contentSize()) {
// preserve bytes that the server already sent after the CONNECT
response
- server.len = connectRespBuf->contentSize() - rep.hdr_sz;
- memcpy(server.buf, connectRespBuf->content()+rep.hdr_sz, server.len);
- } else {
- // reset; delay pools were using this field to throttle CONNECT
response
- server.len = 0;
+ server.buf.append(connectRespBuf->content()+rep.hdr_sz,
connectRespBuf->contentSize() - rep.hdr_sz);
}
delete connectRespBuf;
connectRespBuf = NULL;
connectExchangeCheckpoint();
}
void
TunnelStateData::Connection::logicError(const char *errMsg)
{
debugs(50, 3, conn << " closing on error: " << errMsg);
conn->close();
}
void
TunnelStateData::Connection::error(int const xerrno)
{
/* XXX fixme xstrerror and xerrno... */
errno = xerrno;
debugs(50, debugLevelForError(xerrno), HERE << conn << ": read/write
failure: " << xstrerror());
if (!ignoreErrno(xerrno))
conn->close();
}
/* Read from client side and queue it for writing to the server */
void
-TunnelStateData::ReadClient(const Comm::ConnectionPointer &, char *buf, size_t
len, comm_err_t errcode, int xerrno, void *data)
+TunnelStateData::ReadClient(const Comm::ConnectionPointer &, char *, size_t
len, comm_err_t errcode, int xerrno, void *data)
{
TunnelStateData *tunnelState = (TunnelStateData *)data;
assert (cbdataReferenceValid (tunnelState));
- tunnelState->readClient(buf, len, errcode, xerrno);
+ tunnelState->readClient(len, errcode, xerrno);
}
void
-TunnelStateData::readClient(char *buf, size_t len, comm_err_t errcode, int
xerrno)
+TunnelStateData::readClient(size_t len, comm_err_t errcode, int xerrno)
{
debugs(26, 3, HERE << client.conn << ", read " << len << " bytes, err=" <<
errcode);
/*
* Bail out early on COMM_ERR_CLOSING
* - close handlers will tidy up for us
*/
if (errcode == COMM_ERR_CLOSING)
return;
if (len > 0) {
- client.bytesIn(len);
+#if USE_DELAY_POOLS
+ client.delayId.bytesIn(len);
+#endif
kb_incr(&(statCounter.client_http.kbytes_in), len);
}
if (keepGoingAfterRead(len, errcode, xerrno, client, server))
- copy(len, client, server, WriteServerDone);
+ copy(client, server, WriteServerDone);
}
/// Updates state after reading from client or server.
/// Returns whether the caller should use the data just read.
bool
TunnelStateData::keepGoingAfterRead(size_t len, comm_err_t errcode, int
xerrno, Connection &from, Connection &to)
{
debugs(26, 3, HERE << "from={" << from.conn << "}, to={" << to.conn <<
"}");
/* I think this is to prevent free-while-in-a-callback behaviour
* - RBC 20030229
* from.conn->close() / to.conn->close() done here trigger close callbacks
which may free TunnelStateData
*/
const CbcPointer<TunnelStateData> safetyLock(this);
/* Bump the source connection read timeout on any activity */
if (Comm::IsConnOpen(from.conn)) {
AsyncCall::Pointer timeoutCall = commCbCall(5, 4, "tunnelTimeout",
CommTimeoutCbPtrFun(tunnelTimeout,
this));
commSetConnTimeout(from.conn, Config.Timeout.read, timeoutCall);
}
/* Bump the dest connection read timeout on any activity */
/* see Bug 3659: tunnels can be weird, with very long one-way transfers */
if (Comm::IsConnOpen(to.conn)) {
AsyncCall::Pointer timeoutCall = commCbCall(5, 4, "tunnelTimeout",
CommTimeoutCbPtrFun(tunnelTimeout,
this));
commSetConnTimeout(to.conn, Config.Timeout.read, timeoutCall);
}
if (errcode)
from.error (xerrno);
else if (len == 0 || !Comm::IsConnOpen(to.conn)) {
debugs(26, 3, HERE << "Nothing to write or client gone. Terminate the
tunnel.");
from.conn->close();
/* Only close the remote end if we've finished queueing data to it */
- if (from.len == 0 && Comm::IsConnOpen(to.conn) ) {
+ if (from.buf.isEmpty() && Comm::IsConnOpen(to.conn) ) {
to.conn->close();
}
} else if (cbdataReferenceValid(this)) {
return true;
}
return false;
}
void
-TunnelStateData::copy(size_t len, Connection &from, Connection &to, IOCB
*completion)
+TunnelStateData::copy(Connection &from, Connection &to, IOCB *completion)
{
debugs(26, 3, HERE << "Schedule Write");
AsyncCall::Pointer call = commCbCall(5,5, "TunnelBlindCopyWriteHandler",
CommIoCbPtrFun(completion, this));
- Comm::Write(to.conn, from.buf, len, call, NULL);
+ Comm::Write(to.conn, from.buf, call);
}
/* Writes data from the client buffer to the server side */
void
-TunnelStateData::WriteServerDone(const Comm::ConnectionPointer &, char *buf,
size_t len, comm_err_t flag, int xerrno, void *data)
+TunnelStateData::WriteServerDone(const Comm::ConnectionPointer &, char *,
size_t len, comm_err_t flag, int xerrno, void *data)
{
TunnelStateData *tunnelState = (TunnelStateData *)data;
assert (cbdataReferenceValid (tunnelState));
- tunnelState->writeServerDone(buf, len, flag, xerrno);
+ tunnelState->writeServerDone(len, flag, xerrno);
}
void
-TunnelStateData::writeServerDone(char *buf, size_t len, comm_err_t flag, int
xerrno)
+TunnelStateData::writeServerDone(size_t len, comm_err_t flag, int xerrno)
{
debugs(26, 3, HERE << server.conn << ", " << len << " bytes written,
flag=" << flag);
/* Error? */
if (flag != COMM_OK) {
if (flag != COMM_ERR_CLOSING) {
debugs(26, 4, HERE << "calling TunnelStateData::server.error(" <<
xerrno <<")");
server.error(xerrno); // may call comm_close
}
return;
}
/* EOF? */
if (len == 0) {
debugs(26, 4, HERE << "No read input. Closing server connection.");
server.conn->close();
return;
}
/* Valid data */
kb_incr(&(statCounter.server.all.kbytes_out), len);
kb_incr(&(statCounter.server.other.kbytes_out), len);
client.dataSent(len);
/* If the other end has closed, so should we */
if (!Comm::IsConnOpen(client.conn)) {
debugs(26, 4, HERE << "Client gone away. Shutting down server
connection.");
server.conn->close();
return;
}
const CbcPointer<TunnelStateData> safetyLock(this); /* ??? should
be locked by the caller... */
if (cbdataReferenceValid(this))
copyRead(client, ReadClient);
}
/* Writes data from the server buffer to the client side */
void
-TunnelStateData::WriteClientDone(const Comm::ConnectionPointer &, char *buf,
size_t len, comm_err_t flag, int xerrno, void *data)
+TunnelStateData::WriteClientDone(const Comm::ConnectionPointer &, char *,
size_t len, comm_err_t flag, int xerrno, void *data)
{
TunnelStateData *tunnelState = (TunnelStateData *)data;
assert (cbdataReferenceValid (tunnelState));
- tunnelState->writeClientDone(buf, len, flag, xerrno);
+ tunnelState->writeClientDone(len, flag, xerrno);
}
void
TunnelStateData::Connection::dataSent(size_t amount)
{
- debugs(26, 3, HERE << "len=" << len << " - amount=" << amount);
- assert(amount == (size_t)len);
- len =0;
- /* increment total object size */
+ debugs(26, 3, amount << " bytes written");
+ /* increment total object size */
if (size_ptr)
*size_ptr += amount;
}
void
-TunnelStateData::writeClientDone(char *buf, size_t len, comm_err_t flag, int
xerrno)
+TunnelStateData::writeClientDone(size_t len, comm_err_t flag, int xerrno)
{
debugs(26, 3, HERE << client.conn << ", " << len << " bytes written,
flag=" << flag);
/* Error? */
if (flag != COMM_OK) {
if (flag != COMM_ERR_CLOSING) {
debugs(26, 4, HERE << "Closing client connection due to comm
flags.");
client.error(xerrno); // may call comm_close
}
return;
}
/* EOF? */
if (len == 0) {
debugs(26, 4, HERE << "Closing client connection due to 0 byte read.");
client.conn->close();
return;
}
/* Valid data */
@@ -665,86 +637,86 @@
{
TunnelStateData *tunnelState = static_cast<TunnelStateData *>(io.data);
debugs(26, 3, HERE << io.conn);
/* Temporary lock to protect our own feets (comm_close ->
tunnelClientClosed -> Free) */
CbcPointer<TunnelStateData> safetyLock(tunnelState);
tunnelState->client.closeIfOpen();
tunnelState->server.closeIfOpen();
}
void
TunnelStateData::Connection::closeIfOpen()
{
if (Comm::IsConnOpen(conn))
conn->close();
}
void
TunnelStateData::copyRead(Connection &from, IOCB *completion)
{
- assert(from.len == 0);
AsyncCall::Pointer call = commCbCall(5,4, "TunnelBlindCopyReadHandler",
CommIoCbPtrFun(completion, this));
- comm_read(from.conn, from.buf, from.bytesWanted(1, SQUID_TCP_SO_RCVBUF),
call);
+ from.buf.reserveSpace(from.bytesWanted(1, SQUID_TCP_SO_RCVBUF));
+ comm_read(from.conn, from.buf, call);
}
void
TunnelStateData::readConnectResponse()
{
assert(waitingForConnectResponse());
AsyncCall::Pointer call = commCbCall(5,4, "readConnectResponseDone",
CommIoCbPtrFun(ReadConnectResponseDone, this));
comm_read(server.conn, connectRespBuf->space(),
server.bytesWanted(1, connectRespBuf->spaceSize()), call);
}
/**
* Set the HTTP status for this request and sets the read handlers for client
* and server side connections.
*/
static void
tunnelStartShoveling(TunnelStateData *tunnelState)
{
assert(!tunnelState->waitingForConnectExchange());
*tunnelState->status_ptr = Http::scOkay;
if (cbdataReferenceValid(tunnelState)) {
// Shovel any payload already pushed into reply buffer by the server
response
- if (!tunnelState->server.len)
+ if (!tunnelState->server.buf.length())
tunnelState->copyRead(tunnelState->server,
TunnelStateData::ReadServer);
else {
- debugs(26, DBG_DATA, "Tunnel server PUSH Payload: \n" << Raw("",
tunnelState->server.buf, tunnelState->server.len) << "\n----------");
- tunnelState->copy(tunnelState->server.len, tunnelState->server,
tunnelState->client, TunnelStateData::WriteClientDone);
+ debugs(26, DBG_DATA, "Tunnel server PUSH Payload: \n" <<
tunnelState->server.buf << "\n----------");
+ tunnelState->copy(tunnelState->server, tunnelState->client,
TunnelStateData::WriteClientDone);
}
// Bug 3371: shovel any payload already pushed into ConnStateData by
the client request
if (tunnelState->http.valid() && tunnelState->http->getConn() &&
!tunnelState->http->getConn()->in.buf.isEmpty()) {
struct ConnStateData::In *in = &tunnelState->http->getConn()->in;
debugs(26, DBG_DATA, "Tunnel client PUSH Payload: \n" << in->buf
<< "\n----------");
// We just need to ensure the bytes from ConnStateData are in
client.buf already to deliver
- memcpy(tunnelState->client.buf, in->buf.rawContent(),
in->buf.length());
- // NP: readClient() takes care of buffer length accounting.
- tunnelState->readClient(tunnelState->client.buf, in->buf.length(),
COMM_OK, 0);
+ tunnelState->client.buf = in->buf;
in->buf.consume(); // ConnStateData buffer accounting after the
shuffle.
+ // NP: readClient() takes care of buffer length accounting.
+ tunnelState->readClient(tunnelState->client.buf.length(), COMM_OK,
0);
} else
tunnelState->copyRead(tunnelState->client,
TunnelStateData::ReadClient);
}
}
/**
* All the pieces we need to write to client and/or server connection
* have been written.
* Call the tunnelStartShoveling to start the blind pump.
*/
static void
tunnelConnectedWriteDone(const Comm::ConnectionPointer &conn, char *buf,
size_t size, comm_err_t flag, int xerrno, void *data)
{
TunnelStateData *tunnelState = (TunnelStateData *)data;
debugs(26, 3, HERE << conn << ", flag=" << flag);
if (flag != COMM_OK) {
*tunnelState->status_ptr = Http::scInternalServerError;
tunnelErrorComplete(conn->fd, data, 0);
return;
@@ -838,41 +810,41 @@
Comm::ConnOpener *cs = new
Comm::ConnOpener(tunnelState->serverDestinations[0], call,
Config.Timeout.connect);
cs->setHost(tunnelState->url);
AsyncJob::Start(cs);
} else {
debugs(26, 4, HERE << "terminate with error.");
ErrorState *err = new ErrorState(ERR_CONNECT_FAIL,
Http::scServiceUnavailable, tunnelState->request.getRaw());
*tunnelState->status_ptr = Http::scServiceUnavailable;
err->xerrno = xerrno;
// on timeout is this still: err->xerrno = ETIMEDOUT;
err->port = conn->remote.port();
err->callback = tunnelErrorComplete;
err->callback_data = tunnelState;
errorSend(tunnelState->client.conn, err);
}
return;
}
#if USE_DELAY_POOLS
/* no point using the delayIsNoDelay stuff since tunnel is nice and simple
*/
if (conn->getPeer() && conn->getPeer()->options.no_delay)
- tunnelState->server.setDelayId(DelayId());
+ tunnelState->server.delayId = DelayId();
#endif
tunnelState->request->hier.note(conn, tunnelState->getHost());
tunnelState->server.conn = conn;
tunnelState->request->peer_host = conn->getPeer() ? conn->getPeer()->host
: NULL;
comm_add_close_handler(conn->fd, tunnelServerClosed, tunnelState);
debugs(26, 4, HERE << "determine post-connect handling pathway.");
if (conn->getPeer()) {
tunnelState->request->peer_login = conn->getPeer()->login;
tunnelState->request->flags.proxying =
!(conn->getPeer()->options.originserver);
} else {
tunnelState->request->peer_login = NULL;
tunnelState->request->flags.proxying = false;
}
if (tunnelState->request->flags.proxying)
tunnelState->connectToPeer();
else {
@@ -906,41 +878,41 @@
* default is to allow.
*/
ACLFilledChecklist ch(Config.accessList.miss, request, NULL);
ch.src_addr = request->client_addr;
ch.my_addr = request->my_addr;
if (ch.fastCheck() == ACCESS_DENIED) {
debugs(26, 4, HERE << "MISS access forbidden.");
err = new ErrorState(ERR_FORWARDING_DENIED, Http::scForbidden,
request);
*status_ptr = Http::scForbidden;
errorSend(http->getConn()->clientConnection, err);
return;
}
}
debugs(26, 3, request->method << ' ' << url << ' ' << request->http_ver);
++statCounter.server.all.requests;
++statCounter.server.other.requests;
tunnelState = new TunnelStateData;
#if USE_DELAY_POOLS
- tunnelState->server.setDelayId(DelayId::DelayClient(http));
+ tunnelState->server.delayId = DelayId::DelayClient(http);
#endif
tunnelState->url = xstrdup(url);
tunnelState->request = request;
tunnelState->server.size_ptr = size_ptr;
tunnelState->status_ptr = status_ptr;
tunnelState->client.conn = http->getConn()->clientConnection;
tunnelState->http = http;
tunnelState->al = al;
comm_add_close_handler(tunnelState->client.conn->fd,
tunnelClientClosed,
tunnelState);
AsyncCall::Pointer timeoutCall = commCbCall(5, 4, "tunnelTimeout",
CommTimeoutCbPtrFun(tunnelTimeout,
tunnelState));
commSetConnTimeout(tunnelState->client.conn, Config.Timeout.lifetime,
timeoutCall);
peerSelect(&(tunnelState->serverDestinations), request, al,
NULL,
tunnelPeerSelectComplete,
@@ -1063,29 +1035,20 @@
delete err;
GetMarkingsToServer(tunnelState->request.getRaw(),
*tunnelState->serverDestinations[0]);
debugs(26, 3, HERE << "paths=" << peer_paths->size() << ", p[0]={" <<
(*peer_paths)[0] << "}, serverDest[0]={" <<
tunnelState->serverDestinations[0] << "}");
AsyncCall::Pointer call = commCbCall(26,3, "tunnelConnectDone",
CommConnectCbPtrFun(tunnelConnectDone, tunnelState));
Comm::ConnOpener *cs = new
Comm::ConnOpener(tunnelState->serverDestinations[0], call,
Config.Timeout.connect);
cs->setHost(tunnelState->url);
AsyncJob::Start(cs);
}
CBDATA_CLASS_INIT(TunnelStateData);
bool
TunnelStateData::noConnections() const
{
return !Comm::IsConnOpen(server.conn) && !Comm::IsConnOpen(client.conn);
}
-
-#if USE_DELAY_POOLS
-void
-TunnelStateData::Connection::setDelayId(DelayId const &newDelay)
-{
- delayId = newDelay;
-}
-
-#endif