Thanks Graham for this great patch, comments inline... On Sun, Oct 4, 2015 at 12:10 PM, <minf...@apache.org> wrote: > Author: minfrin > Date: Sun Oct 4 10:10:51 2015 > New Revision: 1706669 > > URL: http://svn.apache.org/viewvc?rev=1706669&view=rev > Log: > core: Extend support for asynchronous write completion from the > network filter to any connection or request filter. > [] > > Modified: httpd/httpd/trunk/include/util_filter.h > URL: > http://svn.apache.org/viewvc/httpd/httpd/trunk/include/util_filter.h?rev=1706669&r1=1706668&r2=1706669&view=diff > ============================================================================== > --- httpd/httpd/trunk/include/util_filter.h (original) > +++ httpd/httpd/trunk/include/util_filter.h Sun Oct 4 10:10:51 2015 > @@ -278,6 +278,13 @@ struct ap_filter_t { > * to the request_rec, except that it is used for connection filters. > */ > conn_rec *c; > + > + /** Buffered data associated with the current filter. */ > + apr_bucket_brigade *bb; > + > + /** Dedicated pool to use for deferred writes. */ > + apr_pool_t *deferred_pool;
Could we make these opaque, eg: # util_filter.h: struct ap_filter_data; struct ap_filter_t { ... struct ap_filter_data *data; }; # util_filter.c: struct ap_filter_data { /** Buffered data associated with the current filter. */ apr_bucket_brigade *bb; /** Dedicated pool to use for deferred writes. */ apr_pool_t *deferred_pool; }; This would prevent them from being mangled anywhere (they are internals to util_filter.c anyway). It could also possibly avoid walking the brigade for every ap_filter_reinstate_brigade() call... > + > }; > [] > > Modified: httpd/httpd/trunk/modules/http/http_request.c > URL: > http://svn.apache.org/viewvc/httpd/httpd/trunk/modules/http/http_request.c?rev=1706669&r1=1706668&r2=1706669&view=diff > ============================================================================== > --- httpd/httpd/trunk/modules/http/http_request.c (original) > +++ httpd/httpd/trunk/modules/http/http_request.c Sun Oct 4 10:10:51 2015 > @@ -256,6 +256,14 @@ AP_DECLARE(void) ap_process_request_afte > apr_bucket *b; > conn_rec *c = r->connection; > > + /* Find the last request, taking into account internal > + * redirects. We want to send the EOR bucket at the end of > + * all the buckets so it does not jump the queue. > + */ > + while (r->next) { > + r = r->next; > + } > + > /* Send an EOR bucket through the output filter chain. When > * this bucket is destroyed, the request will be logged and > * its pool will be freed > @@ -264,8 +272,8 @@ AP_DECLARE(void) ap_process_request_afte > b = ap_bucket_eor_create(c->bucket_alloc, r); > APR_BRIGADE_INSERT_HEAD(bb, b); > > - ap_pass_brigade(c->output_filters, bb); > - > + ap_pass_brigade(r->output_filters, bb);;; Nit, trailing ";;" > + [] > Modified: httpd/httpd/trunk/server/mpm/event/event.c > URL: > http://svn.apache.org/viewvc/httpd/httpd/trunk/server/mpm/event/event.c?rev=1706669&r1=1706668&r2=1706669&view=diff > ============================================================================== > --- httpd/httpd/trunk/server/mpm/event/event.c (original) > +++ httpd/httpd/trunk/server/mpm/event/event.c Sun Oct 4 10:10:51 2015 > @@ -1146,19 +1146,38 @@ read_request: > } > [] > + > + rindex = apr_hash_first(NULL, c->filters); > + while (rindex) { > + ap_filter_t *f = apr_hash_this_val(rindex); > + > + if (!APR_BRIGADE_EMPTY(f->bb)) { > + > + rv = ap_pass_brigade(f, c->empty); > + apr_brigade_cleanup(c->empty); > + if (APR_SUCCESS != rv) { > + ap_log_cerror( > + APLOG_MARK, APLOG_DEBUG, rv, c, APLOGNO(00470) > + "write failure in '%s' output filter", > f->frec->name); > + break; > + } > + > + if (ap_filter_should_yield(f)) { > + data_in_output_filters = 1; > + } > + } > + > + rindex = apr_hash_next(rindex); > } This pattern looks shared by all (relevant) MPMs, couldn't we make it a function? Maybe ap_reinstate_conn()? Also it seems that we leak the hash iterator here (on c->pool). Couldn't we use a (single) linked list for c->filters since ap_filter_t already has a 'next' field? [] > > Modified: httpd/httpd/trunk/server/mpm/motorz/motorz.c > URL: > http://svn.apache.org/viewvc/httpd/httpd/trunk/server/mpm/motorz/motorz.c?rev=1706669&r1=1706668&r2=1706669&view=diff > ============================================================================== > --- httpd/httpd/trunk/server/mpm/motorz/motorz.c (original) > +++ httpd/httpd/trunk/server/mpm/motorz/motorz.c Sun Oct 4 10:10:51 2015 > @@ -359,21 +359,38 @@ static apr_status_t motorz_io_process(mo [] > + > + rindex = apr_hash_first(NULL, c->filters); > + while (rindex) { > + ap_filter_t *f = apr_hash_this_val(rindex); > + > + if (!APR_BRIGADE_EMPTY(f->bb)) { > > - rv = output_filter->frec->filter_func.out_func(output_filter, > - NULL); > + rv = ap_pass_brigade(f, c->empty); > + apr_brigade_cleanup(c->empty); > + if (APR_SUCCESS != rv) { > + ap_log_cerror( > + APLOG_MARK, APLOG_DEBUG, rv, c, > APLOGNO(02848) > + "write failure in '%s' output filter", > f->frec->name); > + break; > + } > + > + if (ap_filter_should_yield(f)) { > + data_in_output_filters = 1; > + } > + } > + > + rindex = apr_hash_next(rindex); > + } Here. [] > Modified: httpd/httpd/trunk/server/mpm/simple/simple_io.c > URL: > http://svn.apache.org/viewvc/httpd/httpd/trunk/server/mpm/simple/simple_io.c?rev=1706669&r1=1706668&r2=1706669&view=diff > ============================================================================== > --- httpd/httpd/trunk/server/mpm/simple/simple_io.c (original) > +++ httpd/httpd/trunk/server/mpm/simple/simple_io.c Sun Oct 4 10:10:51 2015 > @@ -92,20 +92,37 @@ static apr_status_t simple_io_process(si [] > + > + rindex = apr_hash_first(NULL, c->filters); > + while (rindex) { > + ap_filter_t *f = apr_hash_this_val(rindex); > + > + if (!APR_BRIGADE_EMPTY(f->bb)) { > > - rv = output_filter->frec->filter_func.out_func(output_filter, > - NULL); > + rv = ap_pass_brigade(f, c->empty); > + apr_brigade_cleanup(c->empty); > + if (APR_SUCCESS != rv) { > + ap_log_cerror( > + APLOG_MARK, APLOG_DEBUG, rv, c, > APLOGNO(00249) > + "write failure in '%s' output filter", > f->frec->name); > + break; > + } > + > + if (ap_filter_should_yield(f)) { > + data_in_output_filters = 1; > + } > + } > + > + rindex = apr_hash_next(rindex); > + } And here. [] > > Modified: httpd/httpd/trunk/server/util_filter.c > URL: > http://svn.apache.org/viewvc/httpd/httpd/trunk/server/util_filter.c?rev=1706669&r1=1706668&r2=1706669&view=diff > ============================================================================== > --- httpd/httpd/trunk/server/util_filter.c (original) > +++ httpd/httpd/trunk/server/util_filter.c Sun Oct 4 10:10:51 2015 [] > @@ -635,7 +653,8 @@ AP_DECLARE(apr_status_t) ap_save_brigade > apr_status_t rv, srv = APR_SUCCESS; > > /* If have never stored any data in the filter, then we had better > - * create an empty bucket brigade so that we can concat. > + * create an empty bucket brigade so that we can concat. Register > + * a cleanup to zero out the pointer if the pool is cleared. Maybe this comment belongs in ap_filter_setaside_brigade()? > */ > if (!(*saveto)) { > *saveto = apr_brigade_create(p, f->c->bucket_alloc); > @@ -673,6 +692,248 @@ AP_DECLARE(apr_status_t) ap_save_brigade > return srv; > } > > +static apr_status_t filters_cleanup(void *data) > +{ > + ap_filter_t **key = data; > + > + apr_hash_set((*key)->c->filters, key, sizeof(ap_filter_t **), NULL); > + > + return APR_SUCCESS; > +} Shouldn't we call filters_cleanup() from ap_remove_output_filter() too? > + > +AP_DECLARE(apr_status_t) ap_filter_setaside_brigade(ap_filter_t *f, > + apr_bucket_brigade *bb) > +{ > + int loglevel = ap_get_conn_module_loglevel(f->c, APLOG_MODULE_INDEX); > + > + if (loglevel >= APLOG_TRACE6) { > + ap_log_cerror( > + APLOG_MARK, APLOG_TRACE6, 0, f->c, > + "setaside %s brigade to %s brigade in '%s' output filter", > + (APR_BRIGADE_EMPTY(bb) ? "empty" : "full"), > + (!f->bb || APR_BRIGADE_EMPTY(f->bb) ? "empty" : "full"), > f->frec->name); > + } > + > + if (!APR_BRIGADE_EMPTY(bb)) { > + apr_pool_t *pool; > + /* > + * Set aside the brigade bb within f->bb. > + */ > + if (!f->bb) { > + ap_filter_t **key; > + > + pool = f->r ? f->r->pool : f->c->pool; > + > + key = apr_palloc(pool, sizeof(ap_filter_t **)); > + *key = f; > + apr_hash_set(f->c->filters, key, sizeof(ap_filter_t **), f); Why not simply use: key = apr_pmemdup(pool, &f, sizeof f); apr_hash_set(f->c->filters, &key, sizeof key, f) here, and: ap_filter_t *f = data; apr_hash_set(f->c->filters, &f, sizeof f, NULL); in filters_cleanup() above? The linked list could possibly be simpler here too (using the 'next' field). Above filters_cleanup() would have to start from the beginning of the list each time to remove the entries, though. > + > + f->bb = apr_brigade_create(pool, f->c->bucket_alloc); > + > + apr_pool_pre_cleanup_register(pool, key, filters_cleanup); > + > + } > + > + /* decide what pool we setaside to, request pool or deferred pool? */ > + if (f->r) { > + pool = f->r->pool; > + APR_BRIGADE_CONCAT(f->bb, bb); > + } > + else { > + if (!f->deferred_pool) { > + apr_pool_create(&f->deferred_pool, f->c->pool); > + apr_pool_tag(f->deferred_pool, "deferred_pool"); > + } > + pool = f->deferred_pool; > + return ap_save_brigade(f, &f->bb, &bb, pool); > + } Shouldn't ap_save_brigade() be moved below the "else"? > + > + } > + else if (f->deferred_pool) { > + /* > + * There are no more requests in the pipeline. We can just clear the > + * pool. > + */ > + apr_brigade_cleanup(f->bb); > + apr_pool_clear(f->deferred_pool); > + } > + return APR_SUCCESS; > +} > + > +AP_DECLARE(apr_status_t) ap_filter_reinstate_brigade(ap_filter_t *f, > + apr_bucket_brigade *bb, > + apr_bucket **flush_upto) > +{ [] > + apr_bucket *bucket, *next; > + apr_size_t bytes_in_brigade, non_file_bytes_in_brigade; > + int eor_buckets_in_brigade, morphing_bucket_in_brigade; > + int loglevel = ap_get_conn_module_loglevel(f->c, APLOG_MODULE_INDEX); > + > + if (loglevel >= APLOG_TRACE6) { > + ap_log_cerror( > + APLOG_MARK, APLOG_TRACE6, 0, f->c, > + "reinstate %s brigade to %s brigade in '%s' output filter", > + (!f->bb || APR_BRIGADE_EMPTY(f->bb) ? "empty" : "full"), > + (APR_BRIGADE_EMPTY(bb) ? "empty" : "full"), f->frec->name); > + } > + > + if (f->bb && !APR_BRIGADE_EMPTY(f->bb)) { > + APR_BRIGADE_PREPEND(bb, f->bb); > + } > + > + /* > + * Determine if and up to which bucket we need to do a blocking write: > + * > + * a) The brigade contains a flush bucket: Do a blocking write > + * of everything up that point. > + * > + * b) The request is in CONN_STATE_HANDLER state, and the brigade > + * contains at least THRESHOLD_MAX_BUFFER bytes in non-file > + * buckets: Do blocking writes until the amount of data in the > + * buffer is less than THRESHOLD_MAX_BUFFER. (The point of this > + * rule is to provide flow control, in case a handler is > + * streaming out lots of data faster than the data can be > + * sent to the client.) > + * > + * c) The request is in CONN_STATE_HANDLER state, and the brigade > + * contains at least MAX_REQUESTS_IN_PIPELINE EOR buckets: > + * Do blocking writes until less than MAX_REQUESTS_IN_PIPELINE EOR > + * buckets are left. (The point of this rule is to prevent too many > + * FDs being kept open by pipelined requests, possibly allowing a > + * DoS). > + * > + * d) The request is being served by a connection filter and the > + * brigade contains a morphing bucket: If there was no other > + * reason to do a blocking write yet, try reading the bucket. If its > + * contents fit into memory before THRESHOLD_MAX_BUFFER is reached, > + * everything is fine. Otherwise we need to do a blocking write the > + * up to and including the morphing bucket, because ap_save_brigade() > + * would read the whole bucket into memory later on. > + */ > + > + *flush_upto = NULL; > + > + bytes_in_brigade = 0; > + non_file_bytes_in_brigade = 0; > + eor_buckets_in_brigade = 0; > + morphing_bucket_in_brigade = 0; Per the ealier suggestion to make ap_filter_data opaque, these could be part of the struct (and reentrant). We could then PREPEND after the loop below, and avoid potentially to walk the same buckets each time. > + > + for (bucket = APR_BRIGADE_FIRST(bb); bucket != APR_BRIGADE_SENTINEL(bb); > + bucket = next) { > + next = APR_BUCKET_NEXT(bucket); > + > + if (!APR_BUCKET_IS_METADATA(bucket)) { > + if (bucket->length == (apr_size_t)-1) { > + /* > + * A setaside of morphing buckets would read everything into > + * memory. Instead, we will flush everything up to and > + * including this bucket. > + */ > + morphing_bucket_in_brigade = 1; > + } > + else { > + bytes_in_brigade += bucket->length; > + if (!APR_BUCKET_IS_FILE(bucket)) > + non_file_bytes_in_brigade += bucket->length; > + } > + } > + else if (AP_BUCKET_IS_EOR(bucket)) { > + eor_buckets_in_brigade++; > + } > + > + if (APR_BUCKET_IS_FLUSH(bucket) > + || non_file_bytes_in_brigade >= THRESHOLD_MAX_BUFFER > + || (!f->r && morphing_bucket_in_brigade) > + || eor_buckets_in_brigade > MAX_REQUESTS_IN_PIPELINE) { > + /* this segment of the brigade MUST be sent before returning. */ > + > + if (loglevel >= APLOG_TRACE6) { > + char *reason = APR_BUCKET_IS_FLUSH(bucket) ? > + "FLUSH bucket" : > + (non_file_bytes_in_brigade >= > THRESHOLD_MAX_BUFFER) ? > + "THRESHOLD_MAX_BUFFER" : > + (!f->r && morphing_bucket_in_brigade) ? > "morphing bucket" : > + "MAX_REQUESTS_IN_PIPELINE"; > + ap_log_cerror(APLOG_MARK, APLOG_TRACE6, 0, f->c, > + "will flush because of %s", reason); > + ap_log_cerror(APLOG_MARK, APLOG_TRACE8, 0, f->c, > + "seen in brigade%s: bytes: %" APR_SIZE_T_FMT > + ", non-file bytes: %" APR_SIZE_T_FMT ", eor " > + "buckets: %d, morphing buckets: %d", > + flush_upto == NULL ? " so far" > + : " since last flush point", > + bytes_in_brigade, > + non_file_bytes_in_brigade, > + eor_buckets_in_brigade, > + morphing_bucket_in_brigade); > + } > + /* > + * Defer the actual blocking write to avoid doing many writes. > + */ > + *flush_upto = next; > + > + bytes_in_brigade = 0; > + non_file_bytes_in_brigade = 0; > + eor_buckets_in_brigade = 0; > + morphing_bucket_in_brigade = 0; > + } > + } Regards, Yann.