On Thu, Jan 23, 2020 at 8:23 AM Pluem, Ruediger, Vodafone Group
<ruediger.pl...@vodafone.com> wrote:
> >
> > https://github.com/apache/httpd/pull/88
>
> Anyone had a look?

If I understand the patch correctly, ap_request_core_filter() would
let everything through on EOR, regardless of potential morphing
buckets set aside?
So if ap_request_core_filter() yields because of network contention,
setting aside all the remaining buckets (say including a morphing
one), and then ap_process_request_after_handler() sends the EOR (the
handler has finished sending everything on its side), it seems that
ap_request_core_filter() won't play its role anymore?
This plus the fact that ap_request_core_filter() filters "stack up"
for each request, so the oldest request's filter may setaside data for
newer ones, with its own (wrong) request pool.

How about we use a single ap_request_core_filter() for all the
requests, at AP_FTYPE_CONNECTION level, and have it "detect" requests
based on SOR/EOR ranges, where the SOR (Start Of Request) bucket is to
be inserted by the AP_FTYPE_PROTOCOL module (e.g.
ap_http_header_filter for http) before sending any data pertaining to
its request?

It tried that in the attached patch, it passes the test framework and
Joe's reproducer.
Thoughts?

Regards,
Yann.
Index: server/core.c
===================================================================
--- server/core.c	(revision 1875498)
+++ server/core.c	(working copy)
@@ -5417,6 +5478,24 @@ static int core_create_req(request_rec *r)
     return OK;
 }
 
+static int core_create_request(request_rec *r)
+{
+    int rc = core_create_req(r);
+
+    if (rc == OK && !r->main && !r->prev) {
+        ap_filter_t *f = NULL;
+        conn_rec *c = r->connection;
+        apr_pool_userdata_get((void **)&f, "req_core_filter", c->pool);
+        if (f == NULL) {
+            f = ap_add_output_filter_handle(ap_request_core_filter_handle,
+                                            NULL, NULL, c);
+            apr_pool_userdata_setn(f, "req_core_filter", NULL, c->pool);
+        }
+    }
+
+    return rc;
+}
+
 static int core_create_proxy_req(request_rec *r, request_rec *pr)
 {
     return core_create_req(pr);
@@ -5885,7 +5964,7 @@ static void register_hooks(apr_pool_t *p)
     /* FIXME: I suspect we can eliminate the need for these do_nothings - Ben */
     ap_hook_type_checker(do_nothing,NULL,NULL,APR_HOOK_REALLY_LAST);
     ap_hook_fixups(core_override_type,NULL,NULL,APR_HOOK_REALLY_FIRST);
-    ap_hook_create_request(core_create_req, NULL, NULL, APR_HOOK_MIDDLE);
+    ap_hook_create_request(core_create_request, NULL, NULL, APR_HOOK_MIDDLE);
     APR_OPTIONAL_HOOK(proxy, create_req, core_create_proxy_req, NULL, NULL,
                       APR_HOOK_MIDDLE);
     ap_hook_pre_mpm(ap_create_scoreboard, NULL, NULL, APR_HOOK_MIDDLE);
@@ -5918,7 +5997,7 @@ static void register_hooks(apr_pool_t *p)
                                   NULL, AP_FTYPE_NETWORK);
     ap_request_core_filter_handle =
         ap_register_output_filter("REQ_CORE", ap_request_core_filter,
-                                  NULL, AP_FTYPE_CONNECTION - 1);
+                                  NULL, AP_FTYPE_CONNECTION);
     ap_subreq_core_filter_handle =
         ap_register_output_filter("SUBREQ_CORE", ap_sub_req_output_filter,
                                   NULL, AP_FTYPE_CONTENT_SET);
Index: modules/http/http_core.c
===================================================================
--- modules/http/http_core.c	(revision 1875498)
+++ modules/http/http_core.c	(working copy)
@@ -268,8 +268,6 @@ static int http_create_request(request_rec *r)
                                     NULL, r, r->connection);
         ap_add_output_filter_handle(ap_http_outerror_filter_handle,
                                     NULL, r, r->connection);
-        ap_add_output_filter_handle(ap_request_core_filter_handle,
-                                    NULL, r, r->connection);
     }
 
     return OK;
Index: modules/http/http_request.c
===================================================================
--- modules/http/http_request.c	(revision 1875498)
+++ modules/http/http_request.c	(working copy)
@@ -350,7 +350,6 @@ AP_DECLARE(void) ap_process_request_after_handler(
     apr_bucket_brigade *bb;
     apr_bucket *b;
     conn_rec *c = r->connection;
-    ap_filter_t *f;
 
     bb = ap_acquire_brigade(c);
 
@@ -371,15 +370,11 @@ AP_DECLARE(void) ap_process_request_after_handler(
 
     /* All the request filters should have bailed out on EOS, and in any
      * case they shouldn't have to handle this EOR which will destroy the
-     * request underneath them. So go straight to the core request filter
-     * which (if any) will take care of the setaside buckets.
+     * request underneath them. So go straight to the connection filters,
+     * the first of which being ap_request_core_filter() to take care of
+     * request level setaside buckets.
      */
-    for (f = r->output_filters; f; f = f->next) {
-        if (f->frec == ap_request_core_filter_handle) {
-            break;
-        }
-    }
-    ap_pass_brigade(f ? f : c->output_filters, bb);
+    ap_pass_brigade(c->output_filters, bb);
 
     /* The EOR bucket has either been handled by an output filter (eg.
      * deleted or moved to a buffered_bb => no more in bb), or an error
Index: modules/http/http_filters.c
===================================================================
--- modules/http/http_filters.c	(revision 1875498)
+++ modules/http/http_filters.c	(working copy)
@@ -1286,6 +1286,10 @@ AP_CORE_DECLARE_NONSTD(apr_status_t) ap_http_heade
          */
         if (AP_BUCKET_IS_EOC(e)) {
             ap_remove_output_filter(f);
+            if (!ctx->headers_sent) {
+                e = ap_bucket_sor_create(c->bucket_alloc, r);
+                APR_BRIGADE_INSERT_HEAD(b, e);
+            }
             return ap_pass_brigade(f->next, b);
         }
     }
@@ -1452,6 +1456,8 @@ AP_CORE_DECLARE_NONSTD(apr_status_t) ap_http_heade
 
     terminate_header(b2);
 
+    e = ap_bucket_sor_create(c->bucket_alloc, r);
+    APR_BRIGADE_INSERT_HEAD(b2, e);
     if (header_only) {
         e = APR_BRIGADE_LAST(b);
         if (e != APR_BRIGADE_SENTINEL(b) && APR_BUCKET_IS_EOS(e)) {
Index: server/request.c
===================================================================
--- server/request.c	(revision 1875498)
+++ server/request.c	(working copy)
@@ -2058,17 +2058,23 @@ AP_CORE_DECLARE_NONSTD(apr_status_t) ap_sub_req_ou
     return APR_SUCCESS;
 }
 
+struct request_core_filter_ctx {
+    request_rec *r;
+    apr_bucket_brigade *bb;
+    apr_bucket_brigade *tmp_bb;
+};
+
 AP_CORE_DECLARE_NONSTD(apr_status_t) ap_request_core_filter(ap_filter_t *f,
                                                             apr_bucket_brigade *bb)
 {
     apr_status_t status = APR_SUCCESS;
+    struct request_core_filter_ctx *ctx = f->ctx;
     apr_read_type_e block = APR_NONBLOCK_READ;
-    conn_rec *c = f->r->connection;
     apr_bucket *flush_upto = NULL;
-    apr_bucket_brigade *tmp_bb;
     apr_size_t tmp_bb_len = 0;
+    apr_bucket *bucket, *next;
     core_server_config *conf;
-    int seen_eor = 0;
+    apr_status_t rv;
 
     /*
      * Handle the AsyncFilter directive. We limit the filters that are
@@ -2079,99 +2085,145 @@ AP_CORE_DECLARE_NONSTD(apr_status_t) ap_request_co
         return ap_pass_brigade(f->next, bb);
     }
 
-    conf = ap_get_core_module_config(f->r->server->module_config);
+    if (ctx == NULL) {
+        ctx = apr_pcalloc(f->c->pool, sizeof(*ctx));
+        ctx->bb = apr_brigade_create(f->c->pool, f->c->bucket_alloc);
+        ctx->tmp_bb = apr_brigade_create(f->c->pool, f->c->bucket_alloc);
+        f->ctx = ctx;
+    }
 
-    /* Reinstate any buffered content */
-    ap_filter_reinstate_brigade(f, bb, &flush_upto);
-
-    /* After EOR is passed downstream, anything pooled on the request may
-     * be destroyed (including bb!), but not tmp_bb which is acquired from
-     * c->pool (and released after the below loop).
+    /* This filter tracks the ranges in between SOR:EOR buckets, so to use the
+     * relevant request pool for potential setasides (e.g. network congestion).
+     *
+     * Let f->r track the request for the current SOR:EOR range being walked,
+     * and ctx->r track the request at the start of ctx->bb. When it's time to
+     * send the/some buckets from ctx->bb out, ctx->r is set to f->r (i.e. the
+     * current request for the first remaining bucket), and the next call can
+     * start from there (anything not sent is pending to the next call).
+     *
+     * Outside SOR:EOR ranges, f->r is NULL thus potential setasides happen
+     * with no request pool; the buckets are assumed to be non-morphing and
+     * have the lifetime of the connection, supposedly metadata only.
      */
-    tmp_bb = ap_acquire_brigade(f->c);
+    f->r = ctx->r;
+    conf = f->r ? ap_get_core_module_config(f->r->server->module_config)
+                : ap_get_core_module_config(f->c->base_server->module_config);
 
-    /* Don't touch *bb after seen_eor */
-    while (status == APR_SUCCESS && !seen_eor && !APR_BRIGADE_EMPTY(bb)) {
-        apr_bucket *bucket = APR_BRIGADE_FIRST(bb);
-        int do_pass = 0;
+    APR_BRIGADE_CONCAT(ctx->bb, bb);
+    ap_filter_reinstate_brigade(f, ctx->bb, &flush_upto);
 
-        if (AP_BUCKET_IS_EOR(bucket)) {
-            /* pass out everything and never come back again,
-             * r is destroyed with this bucket!
+    while (status == APR_SUCCESS && !APR_BRIGADE_EMPTY(ctx->bb)) {
+        int do_pass = 0, do_morph;
+
+        bucket = APR_BRIGADE_FIRST(ctx->bb);
+        do_morph = (f->r && bucket->length == (apr_size_t)-1);
+
+        /* if the core has set aside data, back off and try later */
+        if (!flush_upto) {
+            if (ap_filter_should_yield(f->next)) {
+                break;
+            }
+        }
+        else if (flush_upto == bucket) {
+            /* If the above ap_filter_reinstate_brigade() wanted us to pass a
+             * morphing bucket because f->r was unset at that time, just ignore
+             * it here if we now know the current request. Below _read() will
+             * morph the bucket so the real data length will be accounted for
+             * in the max threshold anyway.
              */
-            APR_BRIGADE_CONCAT(tmp_bb, bb);
-            ap_remove_output_filter(f);
-            seen_eor = 1;
+            do_pass = !do_morph;
+            flush_upto = NULL;
         }
-        else {
-            /* if the core has set aside data, back off and try later */
-            if (!flush_upto) {
-                if (ap_filter_should_yield(f->next)) {
-                    break;
-                }
-            }
-            else if (flush_upto == bucket) {
-                flush_upto = NULL;
-            }
 
-            /* have we found a morphing bucket? if so, force it to morph into
-             * something safe to pass down to the connection filters without
-             * needing to be set aside.
+        if (AP_BUCKET_IS_SOR(bucket)) {
+            f->r = ap_bucket_sor_get_r(bucket);
+            conf = ap_get_core_module_config(f->r->server->module_config);
+        }
+        else if (AP_BUCKET_IS_EOR(bucket)) {
+            f->r = NULL;
+            conf = ap_get_core_module_config(f->c->base_server->module_config);
+        }
+        else if (do_morph) {
+            const char *data;
+            apr_size_t size;
+
+            /* Found a morphing bucket, force it to morph into something safe
+             * to pass down to the connection filters without needing to be
+             * set aside.
              */
-            if (bucket->length == (apr_size_t)-1) {
-                const char *data;
-                apr_size_t size;
-
-                status = apr_bucket_read(bucket, &data, &size, block);
-                if (status != APR_SUCCESS) {
-                    if (!APR_STATUS_IS_EAGAIN(status)
-                            || block != APR_NONBLOCK_READ) {
-                        break;
-                    }
-                    /* Flush everything so far and retry in blocking mode */
-                    bucket = apr_bucket_flush_create(c->bucket_alloc);
+            rv = apr_bucket_read(bucket, &data, &size, block);
+            if (rv != APR_SUCCESS) {
+                if (APR_STATUS_IS_EAGAIN(rv) && block == APR_NONBLOCK_READ) {
+                    /* Retry in blocking mode */
                     block = APR_BLOCK_READ;
                 }
                 else {
-                    tmp_bb_len += size;
-                    block = APR_NONBLOCK_READ;
+                    /* Last loop, report failure still */
+                    status = rv;
                 }
+                /* In any case, flush everything so far */
+                bucket = apr_bucket_flush_create(f->c->bucket_alloc);
+                do_pass = 1;
             }
             else {
-                tmp_bb_len += bucket->length;
+                block = APR_NONBLOCK_READ;
             }
+        }
 
-            /* move the bucket to tmp_bb and check whether it exhausts bb or
-             * brings tmp_bb above the limit; in both cases it's time to pass
-             * everything down the chain.
-             */
-            APR_BUCKET_REMOVE(bucket);
-            APR_BRIGADE_INSERT_TAIL(tmp_bb, bucket);
-            if (APR_BRIGADE_EMPTY(bb)
-                    || APR_BUCKET_IS_FLUSH(bucket)
-                    || tmp_bb_len >= conf->flush_max_threshold) {
-                do_pass = 1;
+        /* Move the bucket to ctx->tmp_bb and see if it empties ctx->bb, or if
+         * ctx->tmp_bb reaches flush threshold; in both cases it's time to pass
+         * everything down the chain.
+         */
+        APR_BUCKET_REMOVE(bucket);
+        APR_BRIGADE_INSERT_TAIL(ctx->tmp_bb, bucket);
+        if (do_pass
+                || APR_BRIGADE_EMPTY(ctx->bb)
+                || (bucket->length == (apr_size_t)-1)
+                || (tmp_bb_len += bucket->length) >=
+                        conf->flush_max_threshold) {
+            rv = ap_pass_brigade(f->next, ctx->tmp_bb);
+            apr_brigade_cleanup(ctx->tmp_bb);
+            if (status == APR_SUCCESS) {
+                status = rv;
             }
-        }
-
-        if (do_pass || seen_eor) {
-            status = ap_pass_brigade(f->next, tmp_bb);
-            apr_brigade_cleanup(tmp_bb);
             tmp_bb_len = 0;
+            ctx->r = f->r;
         }
     }
 
-    /* Don't touch *bb after seen_eor */
-    if (!seen_eor) {
-        apr_status_t rv;
-        APR_BRIGADE_PREPEND(bb, tmp_bb);
-        rv = ap_filter_setaside_brigade(f, bb);
-        if (status == APR_SUCCESS) {
-            status = rv;
+    /* Setaside remaining buckets left in ctx->[tmp_]bb (yield or error), but
+     * do that per request, according to the SOR:EOR range(s). First reset f->r
+     * to the request at the beginning of ctx->bb since we start from there.
+     */
+    f->r = ctx->r;
+    APR_BRIGADE_PREPEND(ctx->bb, ctx->tmp_bb);
+    for (bucket = APR_BRIGADE_FIRST(ctx->bb);
+         bucket != APR_BRIGADE_SENTINEL(ctx->bb);
+         bucket = next) {
+        next = APR_BUCKET_NEXT(bucket);
+
+        if (AP_BUCKET_IS_SOR(bucket)) {
+            f->r = ap_bucket_sor_get_r(bucket);
         }
+        else if (AP_BUCKET_IS_EOR(bucket)
+                 || next == APR_BRIGADE_SENTINEL(ctx->bb)) {
+            apr_brigade_split_ex(ctx->bb, next, ctx->tmp_bb);
+            rv = ap_filter_setaside_brigade(f, ctx->bb);
+            APR_BRIGADE_CONCAT(ctx->bb, ctx->tmp_bb);
+            if (rv != APR_SUCCESS) {
+                if (status == APR_SUCCESS) {
+                    status = rv;
+                }
+                apr_brigade_cleanup(ctx->bb);
+                ctx->r = NULL;
+                break;
+            }
+            f->r = NULL;
+        }
     }
 
-    ap_release_brigade(f->c, tmp_bb);
+    /* Not to be leaked */
+    f->r = NULL;
 
     return status;
 }
Index: include/http_request.h
===================================================================
--- include/http_request.h	(revision 1875498)
+++ include/http_request.h	(working copy)
@@ -628,6 +628,51 @@ AP_DECLARE(apr_bucket *) ap_bucket_eor_create(apr_
                                               request_rec *r);
 
 /**
+ * Get the request of an End Of REQUEST (EOR) bucket
+ * @param e The bucket to inspect
+ * @return The request, or NULL if the bucket is not a EOR
+ */
+AP_DECLARE(request_rec *) ap_bucket_eor_get_r(apr_bucket *b);
+
+
+/** Start Of REQUEST (SOR) bucket */
+AP_DECLARE_DATA extern const apr_bucket_type_t ap_bucket_type_sor;
+
+/**
+ * Determine if a bucket is an Start Of REQUEST (SOR) bucket
+ * @param e The bucket to inspect
+ * @return true or false
+ */
+#define AP_BUCKET_IS_SOR(e)         (e->type == &ap_bucket_type_sor)
+
+/**
+ * Make the bucket passed in an Start Of REQUEST (SOR) bucket
+ * @param b The bucket to make into an SOR bucket
+ * @param r The request to destroy when this bucket is destroyed
+ * @return The new bucket, or NULL if allocation failed
+ */
+AP_DECLARE(apr_bucket *) ap_bucket_sor_make(apr_bucket *b, request_rec *r);
+
+/**
+ * Create a bucket referring to an Start Of REQUEST (SOR). This bucket
+ * holds a pointer to the request_rec "owning" the following buckets,
+ * until the next SOR (e.g. suspend/resume) or End Of Request (EOR).
+ *
+ * @param list The freelist from which this bucket should be allocated
+ * @param r The request to destroy when this bucket is destroyed
+ * @return The new bucket, or NULL if allocation failed
+ */
+AP_DECLARE(apr_bucket *) ap_bucket_sor_create(apr_bucket_alloc_t *list,
+                                              request_rec *r);
+
+/**
+ * Get the request of a Start Of REQUEST (SOR) bucket
+ * @param e The bucket to inspect
+ * @return The request, or NULL if the bucket is not a SOR
+ */
+AP_DECLARE(request_rec *) ap_bucket_sor_get_r(apr_bucket *b);
+
+/**
  * Can be used within any handler to determine if any authentication
  * is required for the current request.  Note that if used with an
  * access_checker hook, an access_checker_ex hook or an authz provider; the
Index: server/eor_bucket.c
===================================================================
--- server/eor_bucket.c	(revision 1875498)
+++ server/eor_bucket.c	(working copy)
@@ -21,7 +21,7 @@
 
 typedef struct {
     apr_bucket_refcount refcount;
-    request_rec *data;
+    request_rec *r;
 } ap_bucket_eor;
 
 static apr_status_t eor_bucket_cleanup(void *data)
@@ -30,11 +30,13 @@ static apr_status_t eor_bucket_cleanup(void *data)
 
     if (*rp) {
         request_rec *r = *rp;
+
         /*
          * If eor_bucket_destroy is called after us, this prevents
          * eor_bucket_destroy from trying to destroy the pool again.
          */
         *rp = NULL;
+
         /* Update child status and log the transaction */
         ap_update_child_status(r->connection->sbh, SERVER_BUSY_LOG, r);
         ap_run_log_transaction(r);
@@ -42,14 +44,7 @@ static apr_status_t eor_bucket_cleanup(void *data)
             ap_increment_counts(r->connection->sbh, r);
         }
     }
-    return APR_SUCCESS;
-}
 
-static apr_status_t eor_bucket_read(apr_bucket *b, const char **str,
-                                    apr_size_t *len, apr_read_type_e block)
-{
-    *str = NULL;
-    *len = 0;
     return APR_SUCCESS;
 }
 
@@ -58,7 +53,7 @@ AP_DECLARE(apr_bucket *) ap_bucket_eor_make(apr_bu
     ap_bucket_eor *h;
 
     h = apr_bucket_alloc(sizeof(*h), b->list);
-    h->data = r;
+    h->r = r;
 
     b = apr_bucket_shared_make(b, h, 0, 0);
     b->type = &ap_bucket_type_eor;
@@ -85,17 +80,23 @@ AP_DECLARE(apr_bucket *) ap_bucket_eor_create(apr_
          * We need to use a pre-cleanup here because a module may create a
          * sub-pool which is still needed during the log_transaction hook.
          */
-        apr_pool_pre_cleanup_register(r->pool, &h->data, eor_bucket_cleanup);
+        apr_pool_pre_cleanup_register(r->pool, &h->r, eor_bucket_cleanup);
     }
     return b;
 }
 
+AP_DECLARE(request_rec *) ap_bucket_eor_get_r(apr_bucket *b)
+{
+    AP_DEBUG_ASSERT(AP_BUCKET_IS_EOR(b));
+    return ((ap_bucket_eor *)b->data)->r;
+}
+
 static void eor_bucket_destroy(void *data)
 {
     ap_bucket_eor *h = data;
 
     if (apr_bucket_shared_destroy(h)) {
-        request_rec *r = h->data;
+        request_rec *r = h->r;
         if (r) {
             /* eor_bucket_cleanup will be called when the pool gets destroyed */
             apr_pool_destroy(r->pool);
@@ -104,10 +105,55 @@ static void eor_bucket_destroy(void *data)
     }
 }
 
+
+APR_DECLARE(apr_bucket *) ap_bucket_sor_make(apr_bucket *b, request_rec *r)
+{
+    b->type   = &ap_bucket_type_sor;
+    b->data   = r;
+    b->length = 0;
+    b->start  = 0;
+    return b;
+}
+
+APR_DECLARE(apr_bucket *) ap_bucket_sor_create(apr_bucket_alloc_t *list,
+                                               request_rec *r)
+{
+    apr_bucket *b = apr_bucket_alloc(sizeof(*b), list);
+
+    APR_BUCKET_INIT(b);
+    b->free = apr_bucket_free;
+    b->list = list;
+    return ap_bucket_sor_make(b, r);
+}
+
+AP_DECLARE(request_rec *) ap_bucket_sor_get_r(apr_bucket *b)
+{
+    AP_DEBUG_ASSERT(AP_BUCKET_IS_SOR(b));
+    return (request_rec *)b->data;
+}
+
+
+static apr_status_t null_bucket_read(apr_bucket *b, const char **str,
+                                     apr_size_t *len, apr_read_type_e block)
+{
+    *str = NULL;
+    *len = 0;
+    return APR_SUCCESS;
+}
+
+APR_DECLARE_DATA const apr_bucket_type_t ap_bucket_type_sor = {
+    "SOR", 5, APR_BUCKET_METADATA,
+    apr_bucket_destroy_noop,
+    null_bucket_read,
+    apr_bucket_setaside_noop,
+    apr_bucket_split_notimpl,
+    apr_bucket_simple_copy
+};
+
 AP_DECLARE_DATA const apr_bucket_type_t ap_bucket_type_eor = {
     "EOR", 5, APR_BUCKET_METADATA,
     eor_bucket_destroy,
-    eor_bucket_read,
+    null_bucket_read,
     apr_bucket_setaside_noop,
     apr_bucket_split_notimpl,
     apr_bucket_shared_copy

Reply via email to