This one with the mk2 patch actually attached.
Amos
=== modified file 'src/comm/Makefile.am'
--- src/comm/Makefile.am 2012-03-29 09:22:41 +0000
+++ src/comm/Makefile.am 2013-10-16 12:14:31 +0000
@@ -23,6 +23,8 @@
ModSelectWin32.cc \
TcpAcceptor.cc \
TcpAcceptor.h \
+ TcpReceiver.cc \
+ TcpReceiver.h \
UdpOpenDialer.h \
Write.cc \
Write.h \
=== added file 'src/comm/TcpReceiver.cc'
--- src/comm/TcpReceiver.cc 1970-01-01 00:00:00 +0000
+++ src/comm/TcpReceiver.cc 2014-01-05 13:21:33 +0000
@@ -0,0 +1,263 @@
+/*
+ * DEBUG: section 05 TCP Read
+ *
+ * - level 2 minor TCP errors
+ * - level 3 duplicate reasons for halting I/O (bugs? only need to halt once)
+ * - level 4 reasons for errors and halting I/O
+ * - level 5 common I/O and buffer activity
+ */
+#include "squid.h"
+#include "comm.h"
+#include "comm/TcpReceiver.h"
+#include "Debug.h"
+#include "fd.h"
+#include "fde.h"
+#include "StatCounters.h"
+#include "tools.h"
+
+Comm::TcpReceiver::TcpReceiver(const Comm::ConnectionPointer &c) :
+ AsyncJob("Comm::TcpReceiver"),
+ tcp(c),
+ stoppedReceiving_(NULL),
+ stoppedSending_(NULL),
+ closed_(),
+ reader_()
+{}
+
+void
+Comm::TcpReceiver::tcpConnectionInit()
+{
+ typedef CommCbMemFunT<Comm::TcpReceiver, CommCloseCbParams> Dialer;
+ closed_ = JobCallback(33, 5, Dialer, this,
Comm::TcpReceiver::tcpConnectionClosed);
+ comm_add_close_handler(tcp->fd, closed_);
+}
+
+bool
+Comm::TcpReceiver::doneAll() const
+{
+ return stoppedSending() && stoppedReceiving() && !inBuf.hasContent() &&
AsyncJob::doneAll();
+}
+
+void
+Comm::TcpReceiver::swanSong()
+{
+ if (closed_ != NULL)
+ closed_->cancel("Comm::TcpReceiver::swanSong");
+
+ if (Comm::IsConnOpen(tcp))
+ tcp->close();
+}
+
+void
+Comm::TcpReceiver::releaseTcpConnection(const char *reason)
+{
+ // Used by server-side to release the connection before
+ // storing it in Pconn pool
+ comm_remove_close_handler(tcp->fd, closed_);
+ closed_->cancel(reason);
+ stopReading();
+}
+
+void
+Comm::TcpReceiver::stopReading()
+{
+ /* NP: This is a hack needed to allow TunnelStateData
+ * to take control of a socket despite any scheduled read.
+ */
+ if (reading()) {
+ comm_read_cancel(tcp->fd, reader_);
+ reader_ = NULL;
+ }
+}
+
+void
+Comm::TcpReceiver::stopReceiving(const char *error)
+{
+ debugs(5, 4, "receiving error (" << tcp << "): " << error <<
+ "; old sending error: " << (stoppedSending() ? stoppedSending_ :
"none"));
+
+ if (const char *oldError = stoppedReceiving()) {
+ debugs(5, 3, "already stopped receiving: " << oldError);
+ return; // nothing has changed as far as this connection is concerned
+ }
+
+ stoppedReceiving_ = error;
+
+ if (const char *sendError = stoppedSending()) {
+ debugs(5, 3, "closing because also stopped sending: " << sendError);
+ closed_->cancel("graceful close");
+ tcp->close();
+ }
+}
+
+void
+Comm::TcpReceiver::stopSending(const char *error)
+{
+ debugs(5, 4, "sending error (" << tcp << "): " << error <<
+ "; old receiving error: " <<
+ (stoppedReceiving() ? stoppedReceiving_ : "none"));
+
+ if (const char *oldError = stoppedSending()) {
+ debugs(5, 3, "already stopped sending: " << oldError);
+ return; // nothing has changed as far as this connection is concerned
+ }
+ stoppedSending_ = error;
+
+ if (!stoppedReceiving()) {
+ if (const int64_t expecting = mayNeedToReadMore()) {
+ debugs(5, 5, "must still read " << expecting <<
+ " bytes with " << inBuf.contentSize() << " unused");
+ return; // wait for the receiver to finish reading
+ }
+ }
+ closed_->cancel("graceful close");
+ tcp->close();
+}
+
+bool
+Comm::TcpReceiver::maybeMakeSpaceAvailable()
+{
+ /*
+ * why <2? Because delayAwareRead() won't actually read if
+ * you ask it to read 1 byte. The delayed read(2) request
+ * just gets re-queued until the client side drains, then
+ * the I/O thread hangs. Better to not register any read
+ * handler until we get a notification from someone that
+ * its okay to read again if the buffer cannot grow.
+ */
+
+ if (inBuf.spaceSize() < 2) {
+ if (!inBuf.hasPotentialSpace()) {
+ debugs(5, 5, "buffer full: " << inBuf.contentSize() << " of " <<
(inBuf.max_capacity-1) << " bytes");
+ return false;
+ }
+ (void)inBuf.space(inBuf.contentSize()*2);
+ debugs(5, 5, "growing buffer: content-size=" << inBuf.contentSize() <<
" capacity=" << inBuf.capacity);
+ }
+
+ // in case the grow operation above failed for any reason.
+ return (inBuf.spaceSize() > 1);
+}
+
+void
+Comm::TcpReceiver::readSomeData()
+{
+ // one read() at a time
+ if (reading())
+ return;
+
+ // useless to read() after aborting read()
+ if (stoppedReceiving())
+ return;
+
+ // useless to try when there is no buffer space available
+ if (!maybeMakeSpaceAvailable())
+ return;
+
+ debugs(5, 5, tcp << ": reading... buffer space " << inBuf.spaceSize() << "
bytes.");
+
+ typedef CommCbMemFunT<Comm::TcpReceiver, CommIoCbParams> Dialer;
+ reader_ = JobCallback(33, 5, Dialer, this,
Comm::TcpReceiver::readIoHandler);
+ if (!maybeDelayRead(reader_))
+ comm_read(tcp, inBuf.space(), inBuf.spaceSize(), reader_);
+}
+
+/// identifies whether the read() event was due to a network error happening
+bool
+Comm::TcpReceiver::readWasError(comm_err_t flag, int size, int xerrno) const
+{
+ if (flag != COMM_OK) {
+ debugs(5, 2, tcp << ": got flag " << flag);
+ return true;
+ }
+
+ if (size < 0) {
+ if (!ignoreErrno(xerrno)) {
+ debugs(5, 2, tcp << " read failure: " << xstrerr(xerrno));
+ return true;
+ } else if (!inBuf.hasContent()) {
+ debugs(5, 2, tcp << ": no data to process (" << xstrerr(xerrno) <<
")");
+ }
+ }
+
+ return false;
+}
+
+void
+Comm::TcpReceiver::readIoHandler(const CommIoCbParams &io)
+{
+ debugs(5, 5, io.conn << " size " << io.size);
+ Must(reading());
+ reader_ = NULL;
+
+ /* Bail out quickly on COMM_ERR_CLOSING - close handlers will tidy up */
+ if (io.flag == COMM_ERR_CLOSING) {
+ debugs(5, 5, io.conn << " closing Bailout.");
+ return;
+ }
+
+ assert(Comm::IsConnOpen(tcp));
+ assert(io.conn->fd == tcp->fd);
+
+ /*
+ * Don't reset the timeout value here. The timeout value will be
+ * set to Config.Timeout.request by httpAccept() and
+ * clientWriteComplete(), and should apply to the request as a
+ * whole, not individual read() calls. Plus, it breaks our
+ * lame half-close detection
+ */
+ if (readWasError(io.flag, io.size, io.xerrno)) {
+ noteTcpReadError(io.xerrno);
+ io.conn->close();
+ return;
+ }
+
+ if (io.flag == COMM_OK) {
+ if (io.size > 0) {
+ updateByteCountersOnRead(io.size);
+ inBuf.append(io.buf, io.size);
+
+ } else if (io.size == 0) {
+ debugs(5, 5, io.conn << " closed?");
+ stopReceiving("zero sized read(2) result");
+
+ // if the connection is still possibly sending
+ // the child class may be able to stop immediately
+ if (const char *reason = maybeFinishedWithTcp()) {
+ stopSending(reason); // will close connection
+ return;
+ }
+
+ // if already stopped sending, the above will close the connection
+ if (stoppedSending())
+ return;
+
+ /* It might be half-closed, we can't tell */
+ fd_table[io.conn->fd].flags.socket_eof = true;
+ commMarkHalfClosed(io.conn->fd);
+ fd_note(io.conn->fd, "half-closed");
+ }
+ }
+
+ bool mayReadMore = true;
+ // pass handling on to child instance code
+ if (inBuf.hasContent())
+ mayReadMore = processReadBuffer(inBuf);
+
+ if (mayReadMore && !maybeMakeSpaceAvailable()) {
+ stopReceiving("full read buffer - but processing does not free any
space");
+ mayReadMore = false;
+ }
+
+ // schedule another read() - unless aborted by processing actions
+ if (mayReadMore)
+ readSomeData();
+}
+
+/* This is a handler normally called by comm_close() */
+void
+Comm::TcpReceiver::tcpConnectionClosed(const CommCloseCbParams &io)
+{
+ stopReceiving("TCP connection closed");
+ stopSending("TCP connection closed");
+}
=== added file 'src/comm/TcpReceiver.h'
--- src/comm/TcpReceiver.h 1970-01-01 00:00:00 +0000
+++ src/comm/TcpReceiver.h 2014-01-05 13:21:41 +0000
@@ -0,0 +1,132 @@
+#ifndef SQUID_SRC_COMM_TCPRECEIVER_H
+#define SQUID_SRC_COMM_TCPRECEIVER_H
+
+#include "base/AsyncCall.h"
+#include "base/AsyncJob.h"
+#include "comm/Connection.h"
+#include "comm_err_t.h"
+#include "MemBuf.h"
+
+class CommIoCbParams;
+
+namespace Comm {
+
+class TcpReceiver : virtual public AsyncJob
+{
+public:
+ explicit TcpReceiver(const Comm::ConnectionPointer &c);
+ virtual ~TcpReceiver() {}
+
+ // AsyncJob API
+ virtual bool doneAll() const;
+ virtual void swanSong();
+
+ /// initialize the TCP connection event handlers
+ /// close(2) callback etc.
+ void tcpConnectionInit();
+
+ /// releases TCP connection event handlers without closing it
+ void releaseTcpConnection(const char *reason);
+
+ /// whether a read(2) operation is currently underway
+ bool reading() const {return reader_!=NULL;}
+
+ /** Hack to cancel a read if one is scheduled, without blocking future
socket use.
+ * \note Avoid using this method when possible. If the read(2) is done but
+ * AsyncCall is still queued the read(2) bytes will be lost permanently.
+ */
+ void stopReading();
+
+ /// note receiving error and close as soon as we have done with writing as
well
+ void stopReceiving(const char *error);
+
+ /// true if we stopped receiving data
+ const char *stoppedReceiving() const { return stoppedReceiving_; }
+
+ /// note response sending error and close as soon as we read the request
+ void stopSending(const char *error);
+
+ /// true if we stopped sending the response
+ const char *stoppedSending() const { return stoppedSending_; }
+
+ /** Called when sending has stopped to check if more read(2)s may be
required.
+ *
+ * \retval >0 Number of bytes expected still to arrive.
+ * \retval -1 More data still expected to arrive, unknown number of bytes
at this time.
+ * \retval 0 No more bytes expected right now.
+ */
+ virtual int64_t mayNeedToReadMore() const = 0;
+
+ /// called when buffer may be used to receive new network data
+ bool maybeMakeSpaceAvailable();
+
+ /**
+ * Called before scheduling a read(2) operation in case the
+ * child class uses delay_pools to slow read(2) I/O down.
+ * \return true if this read has been deferred.
+ */
+ // TODO: make the delaying part of TcpReceivers task
+ virtual bool maybeDelayRead(const AsyncCall::Pointer &call) {return false;}
+
+ /** called when there is new buffered data to process.
+ *
+ * If the processing requires further read(2) to be halted temporarily it
+ * may return false. The processor is then responsible for ensuring that
+ * readSomeData() is called when read(2) calls are to be resumed.
+ *
+ * \retval true if additional read(2) should be scheduled by the caller.
+ * \retval false if read(2) is to be suspended.
+ */
+ virtual bool processReadBuffer(MemBuf &) = 0;
+
+ /** Called when there is an error performing read(2)
+ * so the child class can perform any cleanup or error handling.
+ * The TCP connection will be closed immediately after this method
+ * completes.
+ */
+ virtual void noteTcpReadError(int) {}
+
+ /// Called when there has been a successful read(2).
+ /// The child class is responsible for data counting.
+ virtual void updateByteCountersOnRead(size_t) = 0;
+
+ /// Attempt to read some data.
+ /// Will call processReadBuffer() when there is data to process.
+ void readSomeData();
+
+ /// callback to handle TCP read(2) input
+ void readIoHandler(const CommIoCbParams &io);
+
+ /**
+ * called when TCP 0-size read(2) occurs to ask the child class
+ * whether it is able to stop sending yet.
+ *
+ * \return a reason for stopping I/O,
+ * or NULL to continue I/O with client half-closed.
+ */
+ virtual const char * maybeFinishedWithTcp() = 0;
+
+ Comm::ConnectionPointer tcp;
+
+ MemBuf inBuf;
+
+private:
+ bool readWasError(comm_err_t flag, int size, int xerrno) const;
+ void tcpConnectionClosed(const CommCloseCbParams &io);
+
+ /// the reason why we no longer read(2) or nil
+ const char *stoppedReceiving_;
+
+ /// the reason why we no longer write(2) or nil
+ const char *stoppedSending_;
+
+ /// callback to stop traffic processing when FD closes
+ AsyncCall::Pointer closed_;
+
+ ///< set when we are reading
+ AsyncCall::Pointer reader_;
+};
+
+} // namespace Comm
+
+#endif /* SQUID_SRC_COMM_TCPRECEIVER_H */