Here is some code. Probably will not compile but it should give you the idea. 

Bill

> On Wed, 14 Nov 2001, Bill Stoddard wrote:
> 
> > core_output_filter does buffering based on mimimum write thresholds,
> > checks for iovec limits and multiple file buckets, and splits brigades
> > accordingly, etc.  These functions would remain in core_output_filter.
> > The network_out_filter would simply iterate over the buckets and send
> > their contents on the network interface (apr_sock calls or
> > alternatives). The network_out_filter would do no buffering, no
> > splitting brigades and no decision making regarding whether content
> > should be sent or not. It just writes the brigade passed to it to the
> > network and it is done.
> 
> I'm not sure I see how that would work.  The whole reason
> core_output_filter does so much extra work is that there are several
> different ways to write to the socket, and different ones are better in
> different situations.  For example, how is network_out_filter going to
> know when to use sendfile() and when to use writev()?  Are you suggesting
> that core_output_filter will stick some metadata buckets in the brigade or
> change the bucket type to some custom bucket type with a special meaning
> (eg a file bucket would always get sendfiled but an iovec bucket would get
> writev'd etc)?
> 
> If it's ready, maybe the easiest thing would be if you just posted the
> code for network_out_filter so we all had a better idea of what you're
> proposing.  Code is speech.  :)
> 
> --Cliff
> 
> --------------------------------------------------------------
>    Cliff Woolley
>    [EMAIL PROTECTED]
>    Charlottesville, VA
> 
> /* ====================================================================
 * The Apache Software License, Version 1.1
 *
 * Copyright (c) 2000-2001 The Apache Software Foundation.  All rights
 * reserved.
 *
 * Redistribution and use in source and binary forms, with or without
 * modification, are permitted provided that the following conditions
 * are met:
 *
 * 1. Redistributions of source code must retain the above copyright
 *    notice, this list of conditions and the following disclaimer.
 *
 * 2. Redistributions in binary form must reproduce the above copyright
 *    notice, this list of conditions and the following disclaimer in
 *    the documentation and/or other materials provided with the
 *    distribution.
 *
 * 3. The end-user documentation included with the redistribution,
 *    if any, must include the following acknowledgment:
 *       "This product includes software developed by the
 *        Apache Software Foundation (http://www.apache.org/)."
 *    Alternately, this acknowledgment may appear in the software itself,
 *    if and wherever such third-party acknowledgments normally appear.
 *
 * 4. The names "Apache" and "Apache Software Foundation" must
 *    not be used to endorse or promote products derived from this
 *    software without prior written permission. For written
 *    permission, please contact [EMAIL PROTECTED]
 *
 * 5. Products derived from this software may not be called "Apache",
 *    nor may "Apache" appear in their name, without prior written
 *    permission of the Apache Software Foundation.
 *
 * THIS SOFTWARE IS PROVIDED ``AS IS'' AND ANY EXPRESSED OR IMPLIED
 * WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED WARRANTIES
 * OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE
 * DISCLAIMED.  IN NO EVENT SHALL THE APACHE SOFTWARE FOUNDATION OR
 * ITS CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
 * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
 * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF
 * USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND
 * ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY,
 * OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT
 * OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF
 * SUCH DAMAGE.
 * ====================================================================
 *
 * This software consists of voluntary contributions made by many
 * individuals on behalf of the Apache Software Foundation.  For more
 * information on the Apache Software Foundation, please see
 * <http://www.apache.org/>.
 *
 * Portions of this software are based upon public domain software
 * originally written at the National Center for Supercomputing Applications,
 * University of Illinois, Urbana-Champaign.
 */

#include "apr.h"
#include "apr_strings.h"

/*
 * emulate_sendfile()
 * Sends the contents of file fd along with header/trailer bytes, if any,
 * to the network. emulate_sendfile will return only when all the bytes have been
 * sent (i.e., it handles partial writes) or on a network error condition.
 */
static apr_status_t emulate_sendfile(conn_rec *c, apr_file_t *fd, 
                                     apr_hdtr_t *hdtr, apr_off_t offset, 
                                     apr_size_t length, apr_size_t *nbytes) 
{
    apr_status_t rv = APR_SUCCESS;
    apr_int32_t togo;        /* Remaining number of bytes in the file to send */
    apr_size_t sendlen = 0;
    apr_size_t bytes_sent;
    apr_int32_t i;
    apr_off_t o;             /* Track the file offset for partial writes */
    char buffer[8192];

    *nbytes = 0;

    /* Send the headers 
     * writev_it_all handles partial writes.
     * XXX: optimization... if headers are less than MIN_WRITE_SIZE, copy 
     * them into buffer
     */
    if ( hdtr && hdtr->numheaders > 0 ) {
        for (i = 0; i < hdtr->numheaders; i++) {
            sendlen += hdtr->headers[i].iov_len;
        }
        rv = writev_it_all(c->client_socket, hdtr->headers, hdtr->numheaders,
                           sendlen, &bytes_sent);
        if (rv == APR_SUCCESS)
            *nbytes += bytes_sent;     /* track total bytes sent */
    }

    /* Seek the file to 'offset' */
    if (offset != 0 && rv == APR_SUCCESS) {
        rv = apr_file_seek(fd, APR_SET, &offset);
    }

    /* Send the file, making sure to handle partial writes */
    togo = length;
    while (rv == APR_SUCCESS && togo) {
        sendlen = togo > sizeof(buffer) ? sizeof(buffer) : togo;
        o = 0;
        rv = apr_file_read(fd, buffer, &sendlen);
        while (rv == APR_SUCCESS && sendlen) {
            bytes_sent = sendlen;
            rv = apr_send(c->client_socket, &buffer[o], &bytes_sent);
            if (rv == APR_SUCCESS) {
                sendlen -= bytes_sent; /* sendlen != bytes_sent ==> partial write */
                o += bytes_sent;       /* o is where we are in the buffer */
                *nbytes += bytes_sent;
                togo -= bytes_sent;    /* track how much of the file we've sent */
            }
        }
    }

    /* Send the trailers 
     * XXX: optimization... if it will fit, send this on the last send in the 
     * loop above
     */
    sendlen = 0;
    if ( rv == APR_SUCCESS && hdtr && hdtr->numtrailers > 0 ) {
        for (i = 0; i < hdtr->numtrailers; i++) {
            sendlen += hdtr->trailers[i].iov_len;
        }
        rv = writev_it_all(c->client_socket, hdtr->trailers, hdtr->numtrailers,
                           sendlen, &bytes_sent);
        if (rv == APR_SUCCESS)
            *nbytes += bytes_sent;
    }

    return rv;
}

static apr_status_t writev_it_all(apr_socket_t *s,
                                  struct iovec *vec, int nvec,
                                  apr_size_t len, apr_size_t *nbytes)
{
    apr_size_t bytes_written = 0;
    apr_status_t rv;
    apr_size_t n = len;
    int i = 0;

    *nbytes = 0;

    /* XXX handle checking for non-blocking socket */
    while (bytes_written != len) {
        rv = apr_sendv(s, vec + i, nvec - i, &n);
        bytes_written += n;
        if (rv != APR_SUCCESS)
            return rv;
        *nbytes += n;

        /* If the write did not complete, adjust the iovecs and issue
         * apr_sendv again
         */
        if (bytes_written < len) {
            /* Skip over the vectors that have already been written */
            apr_size_t cnt = vec[i].iov_len;
            while (n >= cnt && i + 1 < nvec) {
                i++;
                cnt += vec[i].iov_len;
            }
            if (n < cnt) {
                /* Handle partial write of vec i */
                vec[i].iov_base = (char *) vec[i].iov_base + 
                    (vec[i].iov_len - (cnt - n));
                vec[i].iov_len = cnt -n;
            }
        }
        n = len - bytes_written;
    }

    return APR_SUCCESS;
}

/* sendfile_it_all()
 *  send the entire file using sendfile()
 *  handle partial writes
 *  return only when all bytes have been sent or an error is encountered.
 */

#if APR_HAS_SENDFILE
static apr_status_t sendfile_it_all(conn_rec   *c, 
                                    apr_file_t *fd,
                                    apr_hdtr_t *hdtr, 
                                    apr_off_t   file_offset,
                                    apr_size_t  file_bytes_left, 
                                    apr_size_t  total_bytes_left,
                                    apr_int32_t flags)
{
    apr_status_t rv;
#ifdef AP_DEBUG
    apr_int32_t timeout = 0;
#endif

    AP_DEBUG_ASSERT((apr_getsocketopt(c->client_socket, APR_SO_TIMEOUT, 
                       &timeout) == APR_SUCCESS) && 
                     timeout > 0);  /* socket must be in timeout mode */ 
    do {
        apr_size_t tmplen = file_bytes_left;
        
        rv = apr_sendfile(c->client_socket, fd, hdtr, &file_offset, &tmplen, 
                          flags);
        total_bytes_left -= tmplen;
        if (!total_bytes_left || rv != APR_SUCCESS) {
            return rv;        /* normal case & error exit */ 
        }

        AP_DEBUG_ASSERT(total_bytes_left > 0 && tmplen > 0);
        
        /* partial write, oooh noooo... 
         * Skip over any header data which was written
         */
        while (tmplen && hdtr->numheaders) {
            if (tmplen >= hdtr->headers[0].iov_len) {
                tmplen -= hdtr->headers[0].iov_len;
                --hdtr->numheaders;
                ++hdtr->headers;
            }
            else {
                char *iov_base = (char *)hdtr->headers[0].iov_base;

                hdtr->headers[0].iov_len -= tmplen;
                iov_base += tmplen;
                hdtr->headers[0].iov_base = iov_base;
                tmplen = 0;
            }
        }

        /* Skip over any file data which was written */

        if (tmplen <= file_bytes_left) {
            file_offset += tmplen;
            file_bytes_left -= tmplen;
            continue; 
        }
        tmplen -= file_bytes_left;
        file_bytes_left = 0;
        file_offset = 0;
        
        /* Skip over any trailer data which was written */
        
        while (tmplen && hdtr->numtrailers) {
            if (tmplen >= hdtr->trailers[0].iov_len) {
                tmplen -= hdtr->trailers[0].iov_len;
                --hdtr->numtrailers;
                ++hdtr->trailers;
            }
            else {
                char *iov_base = (char *)hdtr->trailers[0].iov_base;

                hdtr->trailers[0].iov_len -= tmplen;
                iov_base += tmplen;
                hdtr->trailers[0].iov_base = iov_base;
                tmplen = 0;
            }
        }
    } while (1);
}
#endif
 
/* 
 * We may save a brigade only if a non blocking write fails 
 * with EWOULDBLOCK. This filter does not otherwise save or cache brigades.
 */
typedef struct NETWORK_OUTPUT_FILTER_CTX {
    apr_bucket_brigade *b;
} network_output_filter_ctx_t;
#define MAX_IOVEC_TO_WRITE 16

static apr_status_t network_output_filter(ap_filter_t *f, apr_bucket_brigade *b)
{
    apr_status_t rv;
    conn_rec *c = f->c;
    network_output_filter_ctx_t *ctx = f->ctx;
    apr_size_t nbytes = 0;
    apr_bucket *last_e = NULL; /* initialized for debugging */
    apr_bucket *e;

    /* tail of brigade if we need another pass */
    apr_bucket_brigade *more = NULL;

    /* one group of iovecs per pass over the brigade */
    apr_size_t nvec = 0;
    apr_size_t nvec_trailers = 0;
    struct iovec vec[MAX_IOVEC_TO_WRITE];
    struct iovec vec_trailers[MAX_IOVEC_TO_WRITE];

    /* one file per pass over the brigade */
    apr_file_t *fd = NULL;
    apr_size_t flen = 0;
    apr_off_t foffset = 0;
    
    if (ctx == NULL) {
        f->ctx = ctx = apr_pcalloc(c->pool, sizeof(network_output_filter_ctx_t));
    }

    /* If we have a saved brigade, concatenate the new brigade to it 
     * XXX: I don't think this filter should save brigades like this.
     * If the network keeps returning EWOULDBLOCK up the stack, then 
     * we could eventually end up with a really big brigade if the upstream
     * driver keeps pumping bytes down the stack. Well, then again perhaps this would
     * be a mistake of the driver to keep sending bytes down upon receiving 
     * EWOULDBLOCK. 
     */
    if (ctx->b) {
        APR_BRIGADE_CONCAT(ctx->b, b);
        b = ctx->b;
        ctx->b = NULL;
    }

    /* Iterate over the brigade and collect iovecs and an fd if there
     * is one. The core_output_filter will ensure:
     * 1. that we will get no more than a single file bucket in a brigade
     * 2. that we will have no more than MAX_IOVEC_TO_WRITE iovecs
     */
    APR_BRIGADE_FOREACH(e, b) {
        last_e = e;
        if (APR_BUCKET_IS_EOS(e) || APR_BUCKET_IS_FLUSH(e)) {
            break;
        }
        else if (APR_BUCKET_IS_FILE(e)) {
            apr_bucket_file *a = e->data;
            
            fd = a->fd;
            flen = e->length;
            foffset = e->start;
        }
        else {
            const char *str;
            apr_size_t n;
            rv = apr_bucket_read(e, &str, &n, APR_BLOCK_READ);
            if (n) {
                if (!fd) {
                    vec[nvec].iov_base = (char*) str;
                    vec[nvec].iov_len = n;
                    nvec++;
                }
                else {
                    vec_trailers[nvec_trailers].iov_base = (char*) str;
                    vec_trailers[nvec_trailers].iov_len = n;
                    nvec_trailers++;
                }
                nbytes += n;
            }
        }
    }

    /* On to the network transport */
    if (fd) {
        apr_hdtr_t hdtr;
#if APR_HAS_SENDFILE
        apr_int32_t flags = 0;
#endif
        
        memset(&hdtr, '\0', sizeof(hdtr));
        if (nvec) {
            hdtr.numheaders = nvec;
            hdtr.headers = vec;
        }
        if (nvec_trailers) {
            hdtr.numtrailers = nvec_trailers;
            hdtr.trailers = vec_trailers;
        }
#if APR_HAS_SENDFILE
        if (!c->keepalive) {
            /* Prepare the socket to be reused */
            flags |= APR_SENDFILE_DISCONNECT_SOCKET;
        }
        rv = sendfile_it_all(c,        /* the connection            */
                             fd,       /* the file to send          */
                             &hdtr,    /* header and trailer iovecs */
                             foffset,  /* offset in the file to begin
                                          sending from              */
                             flen,     /* length of file            */
                             nbytes + flen, /* total length including
                                               headers                */
                             flags);   /* apr_sendfile flags        */
        
        /* If apr_sendfile() returns APR_ENOTIMPL, call emulate_sendfile().
         * emulate_sendfile() is useful to enable the same Apache binary 
         * distribution to support Windows NT/2000 (supports TransmitFile) 
         * and Win95/98 (do not support TransmitFile)
         */
        if (rv == APR_ENOTIMPL)
#endif
        {
            apr_size_t unused_bytes_sent;
            rv = emulate_sendfile(c, fd, &hdtr, foffset, flen, 
                                  &unused_bytes_sent);
        }
            fd = NULL;
    }
    else {
        apr_size_t unused_bytes_sent;
        
        rv = writev_it_all(c->client_socket, 
                           vec, nvec, 
                           nbytes, &unused_bytes_sent);
    }

    apr_brigade_destroy(b);
    if (rv != APR_SUCCESS) {
        ap_log_error(APLOG_MARK, APLOG_INFO, rv, c->base_server,
                     "core_output_filter: writing data to the network");
        if (more)
            apr_brigade_destroy(more);
        if (APR_STATUS_IS_ECONNABORTED(rv) ||
            APR_STATUS_IS_ECONNRESET(rv) ||
            APR_STATUS_IS_EPIPE(rv)) {
            c->aborted = 1;
        }
        return rv;
    }
}


Reply via email to