Thanks Graham for this great patch, comments inline...

On Sun, Oct 4, 2015 at 12:10 PM,  <minf...@apache.org> wrote:
> Author: minfrin
> Date: Sun Oct  4 10:10:51 2015
> New Revision: 1706669
>
> URL: http://svn.apache.org/viewvc?rev=1706669&view=rev
> Log:
> core: Extend support for asynchronous write completion from the
> network filter to any connection or request filter.
>
[]
>
> Modified: httpd/httpd/trunk/include/util_filter.h
> URL: 
> http://svn.apache.org/viewvc/httpd/httpd/trunk/include/util_filter.h?rev=1706669&r1=1706668&r2=1706669&view=diff
> ==============================================================================
> --- httpd/httpd/trunk/include/util_filter.h (original)
> +++ httpd/httpd/trunk/include/util_filter.h Sun Oct  4 10:10:51 2015
> @@ -278,6 +278,13 @@ struct ap_filter_t {
>       *  to the request_rec, except that it is used for connection filters.
>       */
>      conn_rec *c;
> +
> +    /** Buffered data associated with the current filter. */
> +    apr_bucket_brigade *bb;
> +
> +    /** Dedicated pool to use for deferred writes. */
> +    apr_pool_t *deferred_pool;

Could we make these opaque, eg:

# util_filter.h:
struct ap_filter_data;
struct ap_filter_t {
    ...
    struct ap_filter_data *data;
};

# util_filter.c:
struct ap_filter_data
{
   /** Buffered data associated with the current filter. */
   apr_bucket_brigade *bb;

   /** Dedicated pool to use for deferred writes. */
   apr_pool_t *deferred_pool;
};

This would prevent them from being mangled anywhere (they are
internals to util_filter.c anyway).
It could also possibly avoid walking the brigade for every
ap_filter_reinstate_brigade() call...

> +
>  };
>
[]
>
> Modified: httpd/httpd/trunk/modules/http/http_request.c
> URL: 
> http://svn.apache.org/viewvc/httpd/httpd/trunk/modules/http/http_request.c?rev=1706669&r1=1706668&r2=1706669&view=diff
> ==============================================================================
> --- httpd/httpd/trunk/modules/http/http_request.c (original)
> +++ httpd/httpd/trunk/modules/http/http_request.c Sun Oct  4 10:10:51 2015
> @@ -256,6 +256,14 @@ AP_DECLARE(void) ap_process_request_afte
>      apr_bucket *b;
>      conn_rec *c = r->connection;
>
> +    /* Find the last request, taking into account internal
> +     * redirects. We want to send the EOR bucket at the end of
> +     * all the buckets so it does not jump the queue.
> +     */
> +    while (r->next) {
> +        r = r->next;
> +    }
> +
>      /* Send an EOR bucket through the output filter chain.  When
>       * this bucket is destroyed, the request will be logged and
>       * its pool will be freed
> @@ -264,8 +272,8 @@ AP_DECLARE(void) ap_process_request_afte
>      b = ap_bucket_eor_create(c->bucket_alloc, r);
>      APR_BRIGADE_INSERT_HEAD(bb, b);
>
> -    ap_pass_brigade(c->output_filters, bb);
> -
> +    ap_pass_brigade(r->output_filters, bb);;;

Nit, trailing ";;"

> +
[]
> Modified: httpd/httpd/trunk/server/mpm/event/event.c
> URL: 
> http://svn.apache.org/viewvc/httpd/httpd/trunk/server/mpm/event/event.c?rev=1706669&r1=1706668&r2=1706669&view=diff
> ==============================================================================
> --- httpd/httpd/trunk/server/mpm/event/event.c (original)
> +++ httpd/httpd/trunk/server/mpm/event/event.c Sun Oct  4 10:10:51 2015
> @@ -1146,19 +1146,38 @@ read_request:
>      }
>
[]
> +
> +        rindex = apr_hash_first(NULL, c->filters);
> +        while (rindex) {
> +            ap_filter_t *f = apr_hash_this_val(rindex);
> +
> +            if (!APR_BRIGADE_EMPTY(f->bb)) {
> +
> +                rv = ap_pass_brigade(f, c->empty);
> +                apr_brigade_cleanup(c->empty);
> +                if (APR_SUCCESS != rv) {
> +                    ap_log_cerror(
> +                            APLOG_MARK, APLOG_DEBUG, rv, c, APLOGNO(00470)
> +                            "write failure in '%s' output filter", 
> f->frec->name);
> +                    break;
> +                }
> +
> +                if (ap_filter_should_yield(f)) {
> +                    data_in_output_filters = 1;
> +                }
> +            }
> +
> +            rindex = apr_hash_next(rindex);
>          }

This pattern looks shared by all (relevant) MPMs, couldn't we make it
a function? Maybe ap_reinstate_conn()?

Also it seems that we leak the hash iterator here (on c->pool).
Couldn't we use a (single) linked list for c->filters since
ap_filter_t already has a 'next' field?

[]
>
> Modified: httpd/httpd/trunk/server/mpm/motorz/motorz.c
> URL: 
> http://svn.apache.org/viewvc/httpd/httpd/trunk/server/mpm/motorz/motorz.c?rev=1706669&r1=1706668&r2=1706669&view=diff
> ==============================================================================
> --- httpd/httpd/trunk/server/mpm/motorz/motorz.c (original)
> +++ httpd/httpd/trunk/server/mpm/motorz/motorz.c Sun Oct  4 10:10:51 2015
> @@ -359,21 +359,38 @@ static apr_status_t motorz_io_process(mo
[]
> +
> +            rindex = apr_hash_first(NULL, c->filters);
> +            while (rindex) {
> +                ap_filter_t *f = apr_hash_this_val(rindex);
> +
> +                if (!APR_BRIGADE_EMPTY(f->bb)) {
>
> -            rv = output_filter->frec->filter_func.out_func(output_filter,
> -                                                           NULL);
> +                    rv = ap_pass_brigade(f, c->empty);
> +                    apr_brigade_cleanup(c->empty);
> +                    if (APR_SUCCESS != rv) {
> +                        ap_log_cerror(
> +                                APLOG_MARK, APLOG_DEBUG, rv, c, 
> APLOGNO(02848)
> +                                "write failure in '%s' output filter", 
> f->frec->name);
> +                        break;
> +                    }
> +
> +                    if (ap_filter_should_yield(f)) {
> +                        data_in_output_filters = 1;
> +                    }
> +                }
> +
> +                rindex = apr_hash_next(rindex);
> +            }

Here.

[]
> Modified: httpd/httpd/trunk/server/mpm/simple/simple_io.c
> URL: 
> http://svn.apache.org/viewvc/httpd/httpd/trunk/server/mpm/simple/simple_io.c?rev=1706669&r1=1706668&r2=1706669&view=diff
> ==============================================================================
> --- httpd/httpd/trunk/server/mpm/simple/simple_io.c (original)
> +++ httpd/httpd/trunk/server/mpm/simple/simple_io.c Sun Oct  4 10:10:51 2015
> @@ -92,20 +92,37 @@ static apr_status_t simple_io_process(si
[]
> +
> +            rindex = apr_hash_first(NULL, c->filters);
> +            while (rindex) {
> +                ap_filter_t *f = apr_hash_this_val(rindex);
> +
> +                if (!APR_BRIGADE_EMPTY(f->bb)) {
>
> -            rv = output_filter->frec->filter_func.out_func(output_filter,
> -                                                           NULL);
> +                    rv = ap_pass_brigade(f, c->empty);
> +                    apr_brigade_cleanup(c->empty);
> +                    if (APR_SUCCESS != rv) {
> +                        ap_log_cerror(
> +                                APLOG_MARK, APLOG_DEBUG, rv, c, 
> APLOGNO(00249)
> +                                "write failure in '%s' output filter", 
> f->frec->name);
> +                        break;
> +                    }
> +
> +                    if (ap_filter_should_yield(f)) {
> +                        data_in_output_filters = 1;
> +                    }
> +                }
> +
> +                rindex = apr_hash_next(rindex);
> +            }

And here.

[]
>
> Modified: httpd/httpd/trunk/server/util_filter.c
> URL: 
> http://svn.apache.org/viewvc/httpd/httpd/trunk/server/util_filter.c?rev=1706669&r1=1706668&r2=1706669&view=diff
> ==============================================================================
> --- httpd/httpd/trunk/server/util_filter.c (original)
> +++ httpd/httpd/trunk/server/util_filter.c Sun Oct  4 10:10:51 2015
[]
> @@ -635,7 +653,8 @@ AP_DECLARE(apr_status_t) ap_save_brigade
>      apr_status_t rv, srv = APR_SUCCESS;
>
>      /* If have never stored any data in the filter, then we had better
> -     * create an empty bucket brigade so that we can concat.
> +     * create an empty bucket brigade so that we can concat. Register
> +     * a cleanup to zero out the pointer if the pool is cleared.

Maybe this comment belongs in ap_filter_setaside_brigade()?

>       */
>      if (!(*saveto)) {
>          *saveto = apr_brigade_create(p, f->c->bucket_alloc);
> @@ -673,6 +692,248 @@ AP_DECLARE(apr_status_t) ap_save_brigade
>      return srv;
>  }
>
> +static apr_status_t filters_cleanup(void *data)
> +{
> +    ap_filter_t **key = data;
> +
> +    apr_hash_set((*key)->c->filters, key, sizeof(ap_filter_t **), NULL);
> +
> +    return APR_SUCCESS;
> +}

Shouldn't we call filters_cleanup() from ap_remove_output_filter() too?

> +
> +AP_DECLARE(apr_status_t) ap_filter_setaside_brigade(ap_filter_t *f,
> +        apr_bucket_brigade *bb)
> +{
> +    int loglevel = ap_get_conn_module_loglevel(f->c, APLOG_MODULE_INDEX);
> +
> +    if (loglevel >= APLOG_TRACE6) {
> +        ap_log_cerror(
> +            APLOG_MARK, APLOG_TRACE6, 0, f->c,
> +            "setaside %s brigade to %s brigade in '%s' output filter",
> +            (APR_BRIGADE_EMPTY(bb) ? "empty" : "full"),
> +            (!f->bb || APR_BRIGADE_EMPTY(f->bb) ? "empty" : "full"), 
> f->frec->name);
> +    }
> +
> +    if (!APR_BRIGADE_EMPTY(bb)) {
> +        apr_pool_t *pool;
> +        /*
> +         * Set aside the brigade bb within f->bb.
> +         */
> +        if (!f->bb) {
> +            ap_filter_t **key;
> +
> +            pool = f->r ? f->r->pool : f->c->pool;
> +
> +            key = apr_palloc(pool, sizeof(ap_filter_t **));
> +            *key = f;
> +            apr_hash_set(f->c->filters, key, sizeof(ap_filter_t **), f);

Why not simply use:
   key = apr_pmemdup(pool, &f, sizeof f);
   apr_hash_set(f->c->filters, &key, sizeof key, f)
here, and:
   ap_filter_t *f = data;
   apr_hash_set(f->c->filters, &f, sizeof f, NULL);
in filters_cleanup() above?

The linked list could possibly be simpler here too (using the 'next' field).
Above filters_cleanup() would have to start from the beginning of the
list each time to remove the entries, though.

> +
> +            f->bb = apr_brigade_create(pool, f->c->bucket_alloc);
> +
> +            apr_pool_pre_cleanup_register(pool, key, filters_cleanup);
> +
> +        }
> +
> +        /* decide what pool we setaside to, request pool or deferred pool? */
> +        if (f->r) {
> +            pool = f->r->pool;
> +            APR_BRIGADE_CONCAT(f->bb, bb);
> +        }
> +        else {
> +            if (!f->deferred_pool) {
> +                apr_pool_create(&f->deferred_pool, f->c->pool);
> +                apr_pool_tag(f->deferred_pool, "deferred_pool");
> +            }
> +            pool = f->deferred_pool;
> +            return ap_save_brigade(f, &f->bb, &bb, pool);
> +        }

Shouldn't ap_save_brigade() be moved below the "else"?

> +
> +    }
> +    else if (f->deferred_pool) {
> +        /*
> +         * There are no more requests in the pipeline. We can just clear the
> +         * pool.
> +         */
> +        apr_brigade_cleanup(f->bb);
> +        apr_pool_clear(f->deferred_pool);
> +    }
> +    return APR_SUCCESS;
> +}
> +
> +AP_DECLARE(apr_status_t) ap_filter_reinstate_brigade(ap_filter_t *f,
> +                                                     apr_bucket_brigade *bb,
> +                                                     apr_bucket **flush_upto)
> +{
[]
> +    apr_bucket *bucket, *next;
> +    apr_size_t bytes_in_brigade, non_file_bytes_in_brigade;
> +    int eor_buckets_in_brigade, morphing_bucket_in_brigade;
> +    int loglevel = ap_get_conn_module_loglevel(f->c, APLOG_MODULE_INDEX);
> +
> +    if (loglevel >= APLOG_TRACE6) {
> +        ap_log_cerror(
> +            APLOG_MARK, APLOG_TRACE6, 0, f->c,
> +            "reinstate %s brigade to %s brigade in '%s' output filter",
> +            (!f->bb || APR_BRIGADE_EMPTY(f->bb) ? "empty" : "full"),
> +            (APR_BRIGADE_EMPTY(bb) ? "empty" : "full"), f->frec->name);
> +    }
> +
> +    if (f->bb && !APR_BRIGADE_EMPTY(f->bb)) {
> +        APR_BRIGADE_PREPEND(bb, f->bb);
> +    }
> +
> +    /*
> +     * Determine if and up to which bucket we need to do a blocking write:
> +     *
> +     *  a) The brigade contains a flush bucket: Do a blocking write
> +     *     of everything up that point.
> +     *
> +     *  b) The request is in CONN_STATE_HANDLER state, and the brigade
> +     *     contains at least THRESHOLD_MAX_BUFFER bytes in non-file
> +     *     buckets: Do blocking writes until the amount of data in the
> +     *     buffer is less than THRESHOLD_MAX_BUFFER.  (The point of this
> +     *     rule is to provide flow control, in case a handler is
> +     *     streaming out lots of data faster than the data can be
> +     *     sent to the client.)
> +     *
> +     *  c) The request is in CONN_STATE_HANDLER state, and the brigade
> +     *     contains at least MAX_REQUESTS_IN_PIPELINE EOR buckets:
> +     *     Do blocking writes until less than MAX_REQUESTS_IN_PIPELINE EOR
> +     *     buckets are left. (The point of this rule is to prevent too many
> +     *     FDs being kept open by pipelined requests, possibly allowing a
> +     *     DoS).
> +     *
> +     *  d) The request is being served by a connection filter and the
> +     *     brigade contains a morphing bucket: If there was no other
> +     *     reason to do a blocking write yet, try reading the bucket. If its
> +     *     contents fit into memory before THRESHOLD_MAX_BUFFER is reached,
> +     *     everything is fine. Otherwise we need to do a blocking write the
> +     *     up to and including the morphing bucket, because ap_save_brigade()
> +     *     would read the whole bucket into memory later on.
> +     */
> +
> +    *flush_upto = NULL;
> +
> +    bytes_in_brigade = 0;
> +    non_file_bytes_in_brigade = 0;
> +    eor_buckets_in_brigade = 0;
> +    morphing_bucket_in_brigade = 0;

Per the ealier suggestion to make ap_filter_data opaque, these could
be part of the struct (and reentrant).
We could then PREPEND after the loop below, and avoid potentially to
walk the same buckets each time.

> +
> +    for (bucket = APR_BRIGADE_FIRST(bb); bucket != APR_BRIGADE_SENTINEL(bb);
> +         bucket = next) {
> +        next = APR_BUCKET_NEXT(bucket);
> +
> +        if (!APR_BUCKET_IS_METADATA(bucket)) {
> +            if (bucket->length == (apr_size_t)-1) {
> +                /*
> +                 * A setaside of morphing buckets would read everything into
> +                 * memory. Instead, we will flush everything up to and
> +                 * including this bucket.
> +                 */
> +                morphing_bucket_in_brigade = 1;
> +            }
> +            else {
> +                bytes_in_brigade += bucket->length;
> +                if (!APR_BUCKET_IS_FILE(bucket))
> +                    non_file_bytes_in_brigade += bucket->length;
> +            }
> +        }
> +        else if (AP_BUCKET_IS_EOR(bucket)) {
> +            eor_buckets_in_brigade++;
> +        }
> +
> +        if (APR_BUCKET_IS_FLUSH(bucket)
> +            || non_file_bytes_in_brigade >= THRESHOLD_MAX_BUFFER
> +            || (!f->r && morphing_bucket_in_brigade)
> +            || eor_buckets_in_brigade > MAX_REQUESTS_IN_PIPELINE) {
> +            /* this segment of the brigade MUST be sent before returning. */
> +
> +            if (loglevel >= APLOG_TRACE6) {
> +                char *reason = APR_BUCKET_IS_FLUSH(bucket) ?
> +                               "FLUSH bucket" :
> +                               (non_file_bytes_in_brigade >= 
> THRESHOLD_MAX_BUFFER) ?
> +                               "THRESHOLD_MAX_BUFFER" :
> +                               (!f->r && morphing_bucket_in_brigade) ? 
> "morphing bucket" :
> +                               "MAX_REQUESTS_IN_PIPELINE";
> +                ap_log_cerror(APLOG_MARK, APLOG_TRACE6, 0, f->c,
> +                              "will flush because of %s", reason);
> +                ap_log_cerror(APLOG_MARK, APLOG_TRACE8, 0, f->c,
> +                              "seen in brigade%s: bytes: %" APR_SIZE_T_FMT
> +                              ", non-file bytes: %" APR_SIZE_T_FMT ", eor "
> +                              "buckets: %d, morphing buckets: %d",
> +                              flush_upto == NULL ? " so far"
> +                                                 : " since last flush point",
> +                              bytes_in_brigade,
> +                              non_file_bytes_in_brigade,
> +                              eor_buckets_in_brigade,
> +                              morphing_bucket_in_brigade);
> +            }
> +            /*
> +             * Defer the actual blocking write to avoid doing many writes.
> +             */
> +            *flush_upto = next;
> +
> +            bytes_in_brigade = 0;
> +            non_file_bytes_in_brigade = 0;
> +            eor_buckets_in_brigade = 0;
> +            morphing_bucket_in_brigade = 0;
> +        }
> +    }

Regards,
Yann.

Reply via email to