On Thu, Mar 26, 2020 at 12:18 PM Ruediger Pluem <rpl...@apache.org> wrote: > > On 3/26/20 12:11 AM, Graham Leggett wrote: > > > > What it does do is work around buggy request filters (set it to connection) > > or buggy connection filters (set it to network and get 2.4 behaviour)..
What it does is read/morph buckets before the connection filters, is because we can't pass some "buggy request_filter" morphing bucket to the core/ssl output filters? Or more exactly to further ap_filter_setaside_brigade()? If some filter (beside core ones) need to apr_bucket_read() morphing buckets it possibly should use ap_filter_reinstate_brigade_ex() with a new AP_FILTER_REINSTATE_MORPH option (probably accompanied by AP_FILTER_REINSTATE_NONBLOCKING for the usual/optional first nonblocking then flush then blocking danse). > > > > Set AsyncFilter appropriately and you’ll at least narrow it down. > > > > The question you’re asking is “why is is an async path being taken when > > AP_MPMQ_IS_ASYNC is false”. The setasides and reinstates should be noops in > > this situation. > > Having the setasides and reinstates as noops in this situation might help, > but as far as I can tell they make no difference > whether the MPM is sync or async. > What makes you think that an async path is taken? Not until WRITE_COMPLETION. > Setting AsyncFilter to connection will very likely help (not tested yet) as > it effectively removes the ap_request_core_filter. I think I'm convinced we don't need the core request filter. It does nothing different than next ssl_io_filter_output() or ap_core_output_filter() it seems, so is it just a matter of morphing buckets lifetime? Thanks to the EOR bucket mechanism, we assume that objects owned by a request can be used safely provided they precede the EOR, assumption which IMHO we can extend to morphing buckets. If so, I think we can avoid the apr_bucket_setaside() = ENOTIMPL issue by simply moving morphing buckets from/to user and filter brigades in ap_filter_setaside/reinstate_brigade(). Just like FILE buckets, morphing buckets are just file descriptors that use no memory until read, so if we move them on setaside/reinstate we can just ignore them for flushing heuristic and flush_max_threshold accounting. So ap_filter_reinstate_brigade() would *flush_upto in-memory limits only (and FLUSH of course), anything else can wait. The blocking limit is flush_max_pipelined then, but we could split flush_max_threshold into more fine grained _max_in_memory_threshold and _max_total_threshold if needed. Wouldn't that work (patch attached, passes test framework and Joe's repro)? Regards, Yann.
Index: server/core.c =================================================================== --- server/core.c (revision 1875498) +++ server/core.c (working copy) @@ -122,7 +122,6 @@ AP_IMPLEMENT_HOOK_RUN_FIRST(apr_status_t, insert_n /* Handles for core filters */ AP_DECLARE_DATA ap_filter_rec_t *ap_subreq_core_filter_handle; -AP_DECLARE_DATA ap_filter_rec_t *ap_request_core_filter_handle; AP_DECLARE_DATA ap_filter_rec_t *ap_core_output_filter_handle; AP_DECLARE_DATA ap_filter_rec_t *ap_content_length_filter_handle; AP_DECLARE_DATA ap_filter_rec_t *ap_core_input_filter_handle; @@ -5916,9 +5915,6 @@ static void register_hooks(apr_pool_t *p) ap_core_output_filter_handle = ap_register_output_filter("CORE", ap_core_output_filter, NULL, AP_FTYPE_NETWORK); - ap_request_core_filter_handle = - ap_register_output_filter("REQ_CORE", ap_request_core_filter, - NULL, AP_FTYPE_CONNECTION - 1); ap_subreq_core_filter_handle = ap_register_output_filter("SUBREQ_CORE", ap_sub_req_output_filter, NULL, AP_FTYPE_CONTENT_SET); Index: modules/http/http_core.c =================================================================== --- modules/http/http_core.c (revision 1875498) +++ modules/http/http_core.c (working copy) @@ -268,8 +268,6 @@ static int http_create_request(request_rec *r) NULL, r, r->connection); ap_add_output_filter_handle(ap_http_outerror_filter_handle, NULL, r, r->connection); - ap_add_output_filter_handle(ap_request_core_filter_handle, - NULL, r, r->connection); } return OK; Index: server/request.c =================================================================== --- server/request.c (revision 1875498) +++ server/request.c (working copy) @@ -2058,124 +2058,6 @@ AP_CORE_DECLARE_NONSTD(apr_status_t) ap_sub_req_ou return APR_SUCCESS; } -AP_CORE_DECLARE_NONSTD(apr_status_t) ap_request_core_filter(ap_filter_t *f, - apr_bucket_brigade *bb) -{ - apr_status_t status = APR_SUCCESS; - apr_read_type_e block = APR_NONBLOCK_READ; - conn_rec *c = f->r->connection; - apr_bucket *flush_upto = NULL; - apr_bucket_brigade *tmp_bb; - apr_size_t tmp_bb_len = 0; - core_server_config *conf; - int seen_eor = 0; - - /* - * Handle the AsyncFilter directive. We limit the filters that are - * eligible for asynchronous handling here. - */ - if (f->frec->ftype < f->c->async_filter) { - ap_remove_output_filter(f); - return ap_pass_brigade(f->next, bb); - } - - conf = ap_get_core_module_config(f->r->server->module_config); - - /* Reinstate any buffered content */ - ap_filter_reinstate_brigade(f, bb, &flush_upto); - - /* After EOR is passed downstream, anything pooled on the request may - * be destroyed (including bb!), but not tmp_bb which is acquired from - * c->pool (and released after the below loop). - */ - tmp_bb = ap_acquire_brigade(f->c); - - /* Don't touch *bb after seen_eor */ - while (status == APR_SUCCESS && !seen_eor && !APR_BRIGADE_EMPTY(bb)) { - apr_bucket *bucket = APR_BRIGADE_FIRST(bb); - int do_pass = 0; - - if (AP_BUCKET_IS_EOR(bucket)) { - /* pass out everything and never come back again, - * r is destroyed with this bucket! - */ - APR_BRIGADE_CONCAT(tmp_bb, bb); - ap_remove_output_filter(f); - seen_eor = 1; - } - else { - /* if the core has set aside data, back off and try later */ - if (!flush_upto) { - if (ap_filter_should_yield(f->next)) { - break; - } - } - else if (flush_upto == bucket) { - flush_upto = NULL; - } - - /* have we found a morphing bucket? if so, force it to morph into - * something safe to pass down to the connection filters without - * needing to be set aside. - */ - if (bucket->length == (apr_size_t)-1) { - const char *data; - apr_size_t size; - - status = apr_bucket_read(bucket, &data, &size, block); - if (status != APR_SUCCESS) { - if (!APR_STATUS_IS_EAGAIN(status) - || block != APR_NONBLOCK_READ) { - break; - } - /* Flush everything so far and retry in blocking mode */ - bucket = apr_bucket_flush_create(c->bucket_alloc); - block = APR_BLOCK_READ; - } - else { - tmp_bb_len += size; - block = APR_NONBLOCK_READ; - } - } - else { - tmp_bb_len += bucket->length; - } - - /* move the bucket to tmp_bb and check whether it exhausts bb or - * brings tmp_bb above the limit; in both cases it's time to pass - * everything down the chain. - */ - APR_BUCKET_REMOVE(bucket); - APR_BRIGADE_INSERT_TAIL(tmp_bb, bucket); - if (APR_BRIGADE_EMPTY(bb) - || APR_BUCKET_IS_FLUSH(bucket) - || tmp_bb_len >= conf->flush_max_threshold) { - do_pass = 1; - } - } - - if (do_pass || seen_eor) { - status = ap_pass_brigade(f->next, tmp_bb); - apr_brigade_cleanup(tmp_bb); - tmp_bb_len = 0; - } - } - - /* Don't touch *bb after seen_eor */ - if (!seen_eor) { - apr_status_t rv; - APR_BRIGADE_PREPEND(bb, tmp_bb); - rv = ap_filter_setaside_brigade(f, bb); - if (status == APR_SUCCESS) { - status = rv; - } - } - - ap_release_brigade(f->c, tmp_bb); - - return status; -} - extern APR_OPTIONAL_FN_TYPE(authz_some_auth_required) *ap__authz_ap_some_auth_required; AP_DECLARE(int) ap_some_auth_required(request_rec *r) Index: modules/http/http_request.c =================================================================== --- modules/http/http_request.c (revision 1875498) +++ modules/http/http_request.c (working copy) @@ -350,7 +350,6 @@ AP_DECLARE(void) ap_process_request_after_handler( apr_bucket_brigade *bb; apr_bucket *b; conn_rec *c = r->connection; - ap_filter_t *f; bb = ap_acquire_brigade(c); @@ -371,15 +370,9 @@ AP_DECLARE(void) ap_process_request_after_handler( /* All the request filters should have bailed out on EOS, and in any * case they shouldn't have to handle this EOR which will destroy the - * request underneath them. So go straight to the core request filter - * which (if any) will take care of the setaside buckets. + * request underneath them. So go straight to the connection filters. */ - for (f = r->output_filters; f; f = f->next) { - if (f->frec == ap_request_core_filter_handle) { - break; - } - } - ap_pass_brigade(f ? f : c->output_filters, bb); + ap_pass_brigade(c->output_filters, bb); /* The EOR bucket has either been handled by an output filter (eg. * deleted or moved to a buffered_bb => no more in bb), or an error Index: include/http_request.h =================================================================== --- include/http_request.h (revision 1875498) +++ include/http_request.h (working copy) @@ -149,18 +149,6 @@ AP_DECLARE(int) ap_run_sub_req(request_rec *r); */ AP_DECLARE(void) ap_destroy_sub_req(request_rec *r); -/** - * An output filter to ensure that we avoid passing morphing buckets to - * connection filters and in so doing defeat async write completion when - * they are set aside. This should be inserted at the end of a request - * filter stack. - * @param f The current filter - * @param bb The brigade to filter - * @return status code - */ -AP_CORE_DECLARE_NONSTD(apr_status_t) ap_request_core_filter(ap_filter_t *f, - apr_bucket_brigade *bb); - /* * Then there's the case that you want some other request to be served * as the top-level request INSTEAD of what the client requested directly. @@ -601,6 +589,15 @@ AP_DECLARE(int) ap_if_walk(request_rec *r); AP_DECLARE_DATA extern const apr_bucket_type_t ap_bucket_type_eor; /** + * Determine if a bucket is morphing, that is which changes its + * type on read (usually to "heap" allocated data), while moving + * itself at the next position to remain plugged until exhausted. + * @param e The bucket to inspect + * @return true or false + */ +#define AP_BUCKET_IS_MORPHING(e) ((e)->length == (apr_size_t)-1) + +/** * Determine if a bucket is an End Of REQUEST (EOR) bucket * @param e The bucket to inspect * @return true or false Index: server/util_filter.c =================================================================== --- server/util_filter.c (revision 1875498) +++ server/util_filter.c (working copy) @@ -934,9 +934,20 @@ AP_DECLARE(int) ap_filter_prepare_brigade(ap_filte return OK; } +static apr_status_t save_aside_brigade(struct ap_filter_private *fp, + apr_bucket_brigade *bb) +{ + if (!fp->deferred_pool) { + apr_pool_create(&fp->deferred_pool, fp->f->c->pool); + apr_pool_tag(fp->deferred_pool, "deferred_pool"); + } + return ap_save_brigade(fp->f, &fp->bb, &bb, fp->deferred_pool); +} + AP_DECLARE(apr_status_t) ap_filter_setaside_brigade(ap_filter_t *f, apr_bucket_brigade *bb) { + apr_status_t rv = APR_SUCCESS; struct ap_filter_private *fp = f->priv; ap_log_cerror(APLOG_MARK, APLOG_TRACE6, 0, f->c, @@ -945,34 +956,56 @@ AP_DECLARE(apr_status_t) ap_filter_setaside_brigad (!fp->bb || APR_BRIGADE_EMPTY(fp->bb)) ? "empty" : "full", f->frec->name); + /* Buckets in fp->bb are leftover from previous call to reinstate, so + * they happen after the buckets (re)set aside here. + */ + if (fp->bb) { + APR_BRIGADE_CONCAT(bb, fp->bb); + } + if (!APR_BRIGADE_EMPTY(bb)) { + apr_bucket_brigade *tmp_bb = NULL; + /* - * Set aside the brigade bb within fp->bb. + * Set aside the brigade bb to fp->bb. */ + ap_filter_prepare_brigade(f); + do { + apr_bucket *e = APR_BRIGADE_FIRST(bb); + APR_BUCKET_REMOVE(e); - /* decide what pool we setaside to, request pool or deferred pool? */ - if (f->r) { - apr_bucket *e; - for (e = APR_BRIGADE_FIRST(bb); e != APR_BRIGADE_SENTINEL(bb); e = - APR_BUCKET_NEXT(e)) { - if (APR_BUCKET_IS_TRANSIENT(e)) { - int rv = apr_bucket_setaside(e, f->r->pool); + /* Assume that morphing buckets have the correct lifetime! */ + if (AP_BUCKET_IS_MORPHING(e)) { + if (tmp_bb && !APR_BRIGADE_EMPTY(tmp_bb)) { + /* Save non-morphing buckets batched below. */ + rv = save_aside_brigade(fp, tmp_bb); if (rv != APR_SUCCESS) { - return rv; + APR_BRIGADE_PREPEND(bb, tmp_bb); + break; } } + APR_BRIGADE_INSERT_TAIL(fp->bb, e); } - APR_BRIGADE_CONCAT(fp->bb, bb); - } - else { - if (!fp->deferred_pool) { - apr_pool_create(&fp->deferred_pool, f->c->pool); - apr_pool_tag(fp->deferred_pool, "deferred_pool"); + else { + /* Save calls to ap_save_brigade() by batching successive + * non-morping buckets into a temporary brigade. + */ + if (!tmp_bb) { + tmp_bb = ap_acquire_brigade(f->c); + } + APR_BRIGADE_INSERT_TAIL(tmp_bb, e); } - return ap_save_brigade(f, &fp->bb, &bb, fp->deferred_pool); + } while (!APR_BRIGADE_EMPTY(bb)); + + if (tmp_bb) { + /* Save any remainder. */ + if (!APR_BRIGADE_EMPTY(tmp_bb)) { + rv = save_aside_brigade(fp, tmp_bb); + APR_BRIGADE_PREPEND(bb, tmp_bb); + } + ap_release_brigade(f->c, tmp_bb); } - } else if (fp->deferred_pool) { /* @@ -983,7 +1016,8 @@ AP_DECLARE(apr_status_t) ap_filter_setaside_brigad apr_brigade_cleanup(fp->bb); apr_pool_clear(fp->deferred_pool); } - return APR_SUCCESS; + + return rv; } void ap_filter_adopt_brigade(ap_filter_t *f, apr_bucket_brigade *bb) @@ -1007,8 +1041,8 @@ AP_DECLARE(apr_status_t) ap_filter_reinstate_briga apr_bucket **flush_upto) { apr_bucket *bucket, *next; - apr_size_t bytes_in_brigade, non_file_bytes_in_brigade; - int eor_buckets_in_brigade, morphing_bucket_in_brigade; + apr_size_t bytes_in_brigade, memory_bytes_in_brigade; + int eor_buckets_in_brigade, morphing_buckets_in_brigade; struct ap_filter_private *fp = f->priv; core_server_config *conf; @@ -1018,6 +1052,9 @@ AP_DECLARE(apr_status_t) ap_filter_reinstate_briga (APR_BRIGADE_EMPTY(bb) ? "empty" : "full"), f->frec->name); + /* Buckets in fp->bb are leftover from previous call to setaside, so + * they happen before the buckets reinstated (and added) here. + */ if (fp->bb) { APR_BRIGADE_PREPEND(bb, fp->bb); } @@ -1029,68 +1066,58 @@ AP_DECLARE(apr_status_t) ap_filter_reinstate_briga *flush_upto = NULL; /* - * Determine if and up to which bucket we need to do a blocking write: + * Determine if and up to which bucket the caller needs to do a blocking + * write: * - * a) The brigade contains a flush bucket: Do a blocking write - * of everything up that point. + * a) The brigade contains at least one flush bucket: do blocking writes + * of everything up to the last one. * - * b) The request is in CONN_STATE_HANDLER state, and the brigade - * contains at least flush_max_threshold bytes in non-file - * buckets: Do blocking writes until the amount of data in the - * buffer is less than flush_max_threshold. (The point of this - * rule is to provide flow control, in case a handler is - * streaming out lots of data faster than the data can be + * b) The brigade contains at least flush_max_threshold bytes in memory, + * that is non-file and non-morphing buckets: do blocking writes of + * everything up the last bucket above flush_max_threshold. + * (The point of this rule is to provide flow control, in case a + * handler is streaming out lots of data faster than the data can be * sent to the client.) * - * c) The request is in CONN_STATE_HANDLER state, and the brigade - * contains at least flush_max_pipelined EOR buckets: - * Do blocking writes until less than flush_max_pipelined EOR - * buckets are left. (The point of this rule is to prevent too many - * FDs being kept open by pipelined requests, possibly allowing a - * DoS). + * c) The brigade contains at least flush_max_pipelined EOR buckets: do + * blocking writes until the last EOR above flush_max_pipelined. + * (The point of this rule is to prevent too many FDs being kept open + * by pipelined requests, possibly allowing a DoS). * - * d) The request is being served by a connection filter and the - * brigade contains a morphing bucket: If there was no other - * reason to do a blocking write yet, try reading the bucket. If its - * contents fit into memory before flush_max_threshold is reached, - * everything is fine. Otherwise we need to do a blocking write the - * up to and including the morphing bucket, because ap_save_brigade() - * would read the whole bucket into memory later on. + * Note that morphing buckets use no memory until read, so they don't + * account for point b) above. Both ap_filter_reinstate_brigade() and + * ap_filter_setaside_brigade() assume that morphing buckets have an + * appropriate lifetime (until next EOR for instance), so they are simply + * setaside or reinstated by moving them from/to fp->bb to/from bb. */ bytes_in_brigade = 0; - non_file_bytes_in_brigade = 0; + memory_bytes_in_brigade = 0; eor_buckets_in_brigade = 0; - morphing_bucket_in_brigade = 0; + morphing_buckets_in_brigade = 0; - conf = ap_get_core_module_config(f->c->base_server->module_config); + conf = f->r ? ap_get_core_module_config(f->r->server->module_config) + : ap_get_core_module_config(f->c->base_server->module_config); for (bucket = APR_BRIGADE_FIRST(bb); bucket != APR_BRIGADE_SENTINEL(bb); bucket = next) { next = APR_BUCKET_NEXT(bucket); - if (!APR_BUCKET_IS_METADATA(bucket)) { - if (bucket->length == (apr_size_t)-1) { - /* - * A setaside of morphing buckets would read everything into - * memory. Instead, we will flush everything up to and - * including this bucket. - */ - morphing_bucket_in_brigade = 1; + if (AP_BUCKET_IS_EOR(bucket)) { + eor_buckets_in_brigade++; + } + else if (AP_BUCKET_IS_MORPHING(bucket)) { + morphing_buckets_in_brigade++; + } + else if (bucket->length) { + bytes_in_brigade += bucket->length; + if (!APR_BUCKET_IS_FILE(bucket)) { + memory_bytes_in_brigade += bucket->length; } - else { - bytes_in_brigade += bucket->length; - if (!APR_BUCKET_IS_FILE(bucket)) - non_file_bytes_in_brigade += bucket->length; - } } - else if (AP_BUCKET_IS_EOR(bucket)) { - eor_buckets_in_brigade++; - } if (APR_BUCKET_IS_FLUSH(bucket) - || non_file_bytes_in_brigade >= conf->flush_max_threshold - || (!f->r && morphing_bucket_in_brigade) + || memory_bytes_in_brigade > conf->flush_max_threshold || eor_buckets_in_brigade > conf->flush_max_pipelined) { /* this segment of the brigade MUST be sent before returning. */ @@ -1097,22 +1124,20 @@ AP_DECLARE(apr_status_t) ap_filter_reinstate_briga if (APLOGctrace6(f->c)) { char *reason = APR_BUCKET_IS_FLUSH(bucket) ? "FLUSH bucket" : - (non_file_bytes_in_brigade >= conf->flush_max_threshold) ? - "max threshold" : - (!f->r && morphing_bucket_in_brigade) ? "morphing bucket" : - "max requests in pipeline"; + (memory_bytes_in_brigade >= conf->flush_max_threshold) ? + "max threshold" : "max requests in pipeline"; ap_log_cerror(APLOG_MARK, APLOG_TRACE6, 0, f->c, "will flush because of %s", reason); ap_log_cerror(APLOG_MARK, APLOG_TRACE8, 0, f->c, "seen in brigade%s: bytes: %" APR_SIZE_T_FMT - ", non-file bytes: %" APR_SIZE_T_FMT ", eor " + ", memory bytes: %" APR_SIZE_T_FMT ", eor " "buckets: %d, morphing buckets: %d", *flush_upto == NULL ? " so far" : " since last flush point", bytes_in_brigade, - non_file_bytes_in_brigade, + memory_bytes_in_brigade, eor_buckets_in_brigade, - morphing_bucket_in_brigade); + morphing_buckets_in_brigade); } /* * Defer the actual blocking write to avoid doing many writes. @@ -1120,18 +1145,19 @@ AP_DECLARE(apr_status_t) ap_filter_reinstate_briga *flush_upto = next; bytes_in_brigade = 0; - non_file_bytes_in_brigade = 0; + memory_bytes_in_brigade = 0; eor_buckets_in_brigade = 0; - morphing_bucket_in_brigade = 0; + morphing_buckets_in_brigade = 0; } } ap_log_cerror(APLOG_MARK, APLOG_TRACE8, 0, f->c, - "brigade contains: bytes: %" APR_SIZE_T_FMT + "brigade contains%s: bytes: %" APR_SIZE_T_FMT ", non-file bytes: %" APR_SIZE_T_FMT ", eor buckets: %d, morphing buckets: %d", - bytes_in_brigade, non_file_bytes_in_brigade, - eor_buckets_in_brigade, morphing_bucket_in_brigade); + *flush_upto == NULL ? "" : " since last flush point", + bytes_in_brigade, memory_bytes_in_brigade, + eor_buckets_in_brigade, morphing_buckets_in_brigade); return APR_SUCCESS; } @@ -1200,8 +1226,8 @@ AP_DECLARE_NONSTD(int) ap_filter_output_pending(co } /* Flush outer most filters first for ap_filter_should_yield(f->next) - * to be relevant in the previous ones (e.g. ap_request_core_filter() - * won't pass its buckets if its next filters yield already). + * to be relevant in the previous ones (async filters won't pass their + * buckets if their next filters yield already). */ bb = ap_acquire_brigade(c); for (fp = APR_RING_LAST(x->pending_output_filters);