This one with the mk2 patch actually attached.

Amos
=== modified file 'src/comm/Makefile.am'
--- src/comm/Makefile.am        2012-03-29 09:22:41 +0000
+++ src/comm/Makefile.am        2013-10-16 12:14:31 +0000
@@ -23,6 +23,8 @@
        ModSelectWin32.cc \
        TcpAcceptor.cc \
        TcpAcceptor.h \
+       TcpReceiver.cc \
+       TcpReceiver.h \
        UdpOpenDialer.h \
        Write.cc \
        Write.h \

=== added file 'src/comm/TcpReceiver.cc'
--- src/comm/TcpReceiver.cc     1970-01-01 00:00:00 +0000
+++ src/comm/TcpReceiver.cc     2014-01-05 13:21:33 +0000
@@ -0,0 +1,263 @@
+/*
+ * DEBUG: section 05    TCP Read
+ *
+ * - level 2 minor TCP errors
+ * - level 3 duplicate reasons for halting I/O (bugs? only need to halt once)
+ * - level 4 reasons for errors and halting I/O
+ * - level 5 common I/O and buffer activity
+ */
+#include "squid.h"
+#include "comm.h"
+#include "comm/TcpReceiver.h"
+#include "Debug.h"
+#include "fd.h"
+#include "fde.h"
+#include "StatCounters.h"
+#include "tools.h"
+
+Comm::TcpReceiver::TcpReceiver(const Comm::ConnectionPointer &c) :
+        AsyncJob("Comm::TcpReceiver"),
+        tcp(c),
+        stoppedReceiving_(NULL),
+        stoppedSending_(NULL),
+        closed_(),
+        reader_()
+{}
+
+void
+Comm::TcpReceiver::tcpConnectionInit()
+{
+    typedef CommCbMemFunT<Comm::TcpReceiver, CommCloseCbParams> Dialer;
+    closed_ = JobCallback(33, 5, Dialer, this, 
Comm::TcpReceiver::tcpConnectionClosed);
+    comm_add_close_handler(tcp->fd, closed_);
+}
+
+bool
+Comm::TcpReceiver::doneAll() const
+{
+    return stoppedSending() && stoppedReceiving() && !inBuf.hasContent() && 
AsyncJob::doneAll();
+}
+
+void
+Comm::TcpReceiver::swanSong()
+{
+    if (closed_ != NULL)
+        closed_->cancel("Comm::TcpReceiver::swanSong");
+
+    if (Comm::IsConnOpen(tcp))
+        tcp->close();
+}
+
+void
+Comm::TcpReceiver::releaseTcpConnection(const char *reason)
+{
+    // Used by server-side to release the connection before
+    // storing it in Pconn pool
+    comm_remove_close_handler(tcp->fd, closed_);
+    closed_->cancel(reason);
+    stopReading();
+}
+
+void
+Comm::TcpReceiver::stopReading()
+{
+    /* NP: This is a hack needed to allow TunnelStateData
+     * to take control of a socket despite any scheduled read.
+     */
+    if (reading()) {
+        comm_read_cancel(tcp->fd, reader_);
+        reader_ = NULL;
+    }
+}
+
+void
+Comm::TcpReceiver::stopReceiving(const char *error)
+{
+    debugs(5, 4, "receiving error (" << tcp << "): " << error <<
+           "; old sending error: " << (stoppedSending() ? stoppedSending_ : 
"none"));
+
+    if (const char *oldError = stoppedReceiving()) {
+        debugs(5, 3, "already stopped receiving: " << oldError);
+        return; // nothing has changed as far as this connection is concerned
+    }
+
+    stoppedReceiving_ = error;
+
+    if (const char *sendError = stoppedSending()) {
+        debugs(5, 3, "closing because also stopped sending: " << sendError);
+        closed_->cancel("graceful close");
+        tcp->close();
+    }
+}
+
+void
+Comm::TcpReceiver::stopSending(const char *error)
+{
+    debugs(5, 4, "sending error (" << tcp << "): " << error <<
+           "; old receiving error: " <<
+           (stoppedReceiving() ? stoppedReceiving_ : "none"));
+
+    if (const char *oldError = stoppedSending()) {
+        debugs(5, 3, "already stopped sending: " << oldError);
+        return; // nothing has changed as far as this connection is concerned
+    }
+    stoppedSending_ = error;
+
+    if (!stoppedReceiving()) {
+        if (const int64_t expecting = mayNeedToReadMore()) {
+            debugs(5, 5, "must still read " << expecting <<
+                   " bytes with " << inBuf.contentSize() << " unused");
+            return; // wait for the receiver to finish reading
+        }
+    }
+    closed_->cancel("graceful close");
+    tcp->close();
+}
+
+bool
+Comm::TcpReceiver::maybeMakeSpaceAvailable()
+{
+    /*
+     * why <2? Because delayAwareRead() won't actually read if
+     * you ask it to read 1 byte.  The delayed read(2) request
+     * just gets re-queued until the client side drains, then
+     * the I/O thread hangs.  Better to not register any read
+     * handler until we get a notification from someone that
+     * its okay to read again if the buffer cannot grow.
+     */
+
+    if (inBuf.spaceSize() < 2) {
+        if (!inBuf.hasPotentialSpace()) {
+            debugs(5, 5, "buffer full: " << inBuf.contentSize() << " of " << 
(inBuf.max_capacity-1) << " bytes");
+            return false;
+        }
+        (void)inBuf.space(inBuf.contentSize()*2);
+        debugs(5, 5, "growing buffer: content-size=" << inBuf.contentSize() << 
" capacity=" << inBuf.capacity);
+    }
+
+    // in case the grow operation above failed for any reason.
+    return (inBuf.spaceSize() > 1);
+}
+
+void
+Comm::TcpReceiver::readSomeData()
+{
+    // one read() at a time
+    if (reading())
+        return;
+
+    // useless to read() after aborting read()
+    if (stoppedReceiving())
+        return;
+
+    // useless to try when there is no buffer space available
+    if (!maybeMakeSpaceAvailable())
+        return;
+
+    debugs(5, 5, tcp << ": reading... buffer space " << inBuf.spaceSize() << " 
bytes.");
+
+    typedef CommCbMemFunT<Comm::TcpReceiver, CommIoCbParams> Dialer;
+    reader_ = JobCallback(33, 5, Dialer, this, 
Comm::TcpReceiver::readIoHandler);
+    if (!maybeDelayRead(reader_))
+        comm_read(tcp, inBuf.space(), inBuf.spaceSize(), reader_);
+}
+
+/// identifies whether the read() event was due to a network error happening
+bool
+Comm::TcpReceiver::readWasError(comm_err_t flag, int size, int xerrno) const
+{
+    if (flag != COMM_OK) {
+        debugs(5, 2, tcp << ": got flag " << flag);
+        return true;
+    }
+
+    if (size < 0) {
+        if (!ignoreErrno(xerrno)) {
+            debugs(5, 2, tcp << " read failure: " << xstrerr(xerrno));
+            return true;
+        } else if (!inBuf.hasContent()) {
+            debugs(5, 2, tcp << ": no data to process (" << xstrerr(xerrno) << 
")");
+        }
+    }
+
+    return false;
+}
+
+void
+Comm::TcpReceiver::readIoHandler(const CommIoCbParams &io)
+{
+    debugs(5, 5, io.conn << " size " << io.size);
+    Must(reading());
+    reader_ = NULL;
+
+    /* Bail out quickly on COMM_ERR_CLOSING - close handlers will tidy up */
+    if (io.flag == COMM_ERR_CLOSING) {
+        debugs(5, 5, io.conn << " closing Bailout.");
+        return;
+    }
+
+    assert(Comm::IsConnOpen(tcp));
+    assert(io.conn->fd == tcp->fd);
+
+    /*
+     * Don't reset the timeout value here.  The timeout value will be
+     * set to Config.Timeout.request by httpAccept() and
+     * clientWriteComplete(), and should apply to the request as a
+     * whole, not individual read() calls.  Plus, it breaks our
+     * lame half-close detection
+     */
+    if (readWasError(io.flag, io.size, io.xerrno)) {
+        noteTcpReadError(io.xerrno);
+        io.conn->close();
+        return;
+    }
+
+    if (io.flag == COMM_OK) {
+        if (io.size > 0) {
+            updateByteCountersOnRead(io.size);
+            inBuf.append(io.buf, io.size);
+
+        } else if (io.size == 0) {
+            debugs(5, 5, io.conn << " closed?");
+            stopReceiving("zero sized read(2) result");
+
+            // if the connection is still possibly sending
+            // the child class may be able to stop immediately
+            if (const char *reason = maybeFinishedWithTcp()) {
+                stopSending(reason); // will close connection
+                return;
+            }
+
+            // if already stopped sending, the above will close the connection
+            if (stoppedSending())
+                return;
+
+            /* It might be half-closed, we can't tell */
+            fd_table[io.conn->fd].flags.socket_eof = true;
+            commMarkHalfClosed(io.conn->fd);
+            fd_note(io.conn->fd, "half-closed");
+        }
+    }
+
+    bool mayReadMore = true;
+    // pass handling on to child instance code
+    if (inBuf.hasContent())
+        mayReadMore = processReadBuffer(inBuf);
+
+    if (mayReadMore && !maybeMakeSpaceAvailable()) {
+        stopReceiving("full read buffer - but processing does not free any 
space");
+        mayReadMore = false;
+    }
+
+    // schedule another read() - unless aborted by processing actions
+    if (mayReadMore)
+        readSomeData();
+}
+
+/* This is a handler normally called by comm_close() */
+void
+Comm::TcpReceiver::tcpConnectionClosed(const CommCloseCbParams &io)
+{
+    stopReceiving("TCP connection closed");
+    stopSending("TCP connection closed");
+}

=== added file 'src/comm/TcpReceiver.h'
--- src/comm/TcpReceiver.h      1970-01-01 00:00:00 +0000
+++ src/comm/TcpReceiver.h      2014-01-05 13:21:41 +0000
@@ -0,0 +1,132 @@
+#ifndef SQUID_SRC_COMM_TCPRECEIVER_H
+#define SQUID_SRC_COMM_TCPRECEIVER_H
+
+#include "base/AsyncCall.h"
+#include "base/AsyncJob.h"
+#include "comm/Connection.h"
+#include "comm_err_t.h"
+#include "MemBuf.h"
+
+class CommIoCbParams;
+
+namespace Comm {
+
+class TcpReceiver : virtual public AsyncJob
+{
+public:
+    explicit TcpReceiver(const Comm::ConnectionPointer &c);
+    virtual ~TcpReceiver() {}
+
+    // AsyncJob API
+    virtual bool doneAll() const;
+    virtual void swanSong();
+
+    /// initialize the TCP connection event handlers
+    /// close(2) callback etc.
+    void tcpConnectionInit();
+
+    /// releases TCP connection event handlers without closing it
+    void releaseTcpConnection(const char *reason);
+
+    /// whether a read(2) operation is currently underway
+    bool reading() const {return reader_!=NULL;}
+
+    /** Hack to cancel a read if one is scheduled, without blocking future 
socket use.
+     * \note Avoid using this method when possible. If the read(2) is done but
+     *  AsyncCall is still queued the read(2) bytes will be lost permanently.
+     */
+    void stopReading();
+
+    /// note receiving error and close as soon as we have done with writing as 
well
+    void stopReceiving(const char *error);
+
+    /// true if we stopped receiving data
+    const char *stoppedReceiving() const { return stoppedReceiving_; }
+
+    /// note response sending error and close as soon as we read the request
+    void stopSending(const char *error);
+
+    /// true if we stopped sending the response
+    const char *stoppedSending() const { return stoppedSending_; }
+
+    /** Called when sending has stopped to check if more read(2)s may be 
required.
+     *
+     * \retval >0 Number of bytes expected still to arrive.
+     * \retval -1 More data still expected to arrive, unknown number of bytes 
at this time.
+     * \retval 0  No more bytes expected right now.
+     */
+    virtual int64_t mayNeedToReadMore() const = 0;
+
+    /// called when buffer may be used to receive new network data
+    bool maybeMakeSpaceAvailable();
+
+    /**
+     * Called before scheduling a read(2) operation in case the
+     * child class uses delay_pools to slow read(2) I/O down.
+     * \return true if this read has been deferred.
+     */
+    // TODO: make the delaying part of TcpReceivers task
+    virtual bool maybeDelayRead(const AsyncCall::Pointer &call) {return false;}
+
+    /** called when there is new buffered data to process.
+     *
+     * If the processing requires further read(2) to be halted temporarily it
+     * may return false. The processor is then responsible for ensuring that
+     * readSomeData() is called when read(2) calls are to be resumed.
+     *
+     *  \retval true  if additional read(2) should be scheduled by the caller.
+     *  \retval false if read(2) is to be suspended.
+     */
+    virtual bool processReadBuffer(MemBuf &) = 0;
+
+    /** Called when there is an error performing read(2)
+     * so the child class can perform any cleanup or error handling.
+     * The TCP connection will be closed immediately after this method
+     * completes.
+     */
+    virtual void noteTcpReadError(int) {}
+
+    /// Called when there has been a successful read(2).
+    /// The child class is responsible for data counting.
+    virtual void updateByteCountersOnRead(size_t) = 0;
+
+    /// Attempt to read some data.
+    /// Will call processReadBuffer() when there is data to process.
+    void readSomeData();
+
+    /// callback to handle TCP read(2) input
+    void readIoHandler(const CommIoCbParams &io);
+
+    /**
+     * called when TCP 0-size read(2) occurs to ask the child class
+     * whether it is able to stop sending yet.
+     *
+     * \return a reason for stopping I/O,
+     *         or NULL to continue I/O with client half-closed.
+     */
+    virtual const char * maybeFinishedWithTcp() = 0;
+
+    Comm::ConnectionPointer tcp;
+
+    MemBuf inBuf;
+
+private:
+    bool readWasError(comm_err_t flag, int size, int xerrno) const;
+    void tcpConnectionClosed(const CommCloseCbParams &io);
+
+    /// the reason why we no longer read(2) or nil
+    const char *stoppedReceiving_;
+
+    /// the reason why we no longer write(2) or nil
+    const char *stoppedSending_;
+
+    /// callback to stop traffic processing when FD closes
+    AsyncCall::Pointer closed_;
+
+    ///< set when we are reading
+    AsyncCall::Pointer reader_;
+};
+
+} // namespace Comm
+
+#endif /* SQUID_SRC_COMM_TCPRECEIVER_H */

Reply via email to