On Thu, Jul 1, 2021 at 12:38 PM Yann Ylavic <ylavic....@gmail.com> wrote:
>
> On Wed, Jun 30, 2021 at 9:42 AM Joe Orton <jor...@redhat.com> wrote:
> >
> > If so I wonder if it wouldn't be better to overload FLUSH for this, e.g.
> > by having a FLUSH bucket with a non-NULL ->data pointer or something?
> > The core knows it is special but everywhere else treats as FLUSH.
>
> That's a great idea, let me try that.

Here is a new patch, the semantics of WC buckets are defined as:
/**
 * @brief Write Completion (WC) bucket
 *
 * A WC bucket is a FLUSH bucket with special ->data == &ap_bucket_wc_data,
 * still both AP_BUCKET_IS_WC() and APR_BUCKET_IS_FLUSH() hold for them so
 * they have the same semantics for most filters, namely:
 *   Everything produced before shall be passed to the next filter, including
 *   the WC/FLUSH bucket itself.
 * The distinction between WC and FLUSH buckets is only for filters that care
 * about write completion (calling ap_filter_reinstate_brigade() with non-NULL
 * flush_upto), those can setaside WC buckets and the preceding data provided
 * they have first determined that the next filter(s) have pending data
 * already, usually by calling ap_filter_should_yield(f->next).
 */

The only filters that care about write completion for now are
ap_core_output_filter() and ssl_io_filter_output(), which try to fill
in the pipe as much as possible, using
ap_filter_reinstate_brigade(&flush_upto) to determine whether they
should flush (blocking) or setaside their remaining data.

So ap_filter_reinstate_brigade() is made to not treat WC as FLUSH
buckets and keep the above filters working as before (and correctly
w.r.t WC bucket semantics). I first thought adding a new
ap_filter_rec->proto_flags like AP_FILTER_PROTO_WC_READY to do this
only for filter registered with this flag, and then register
ap_core_output_filter() and ssl_io_filter_output() accordingly, but
since they are the only users of ap_filter_reinstate_brigade() with
flush_upto != NULL I kept it simple for now..

WDYT?

Cheers;
Yann.
Index: include/ap_mmn.h
===================================================================
--- include/ap_mmn.h	(revision 1892424)
+++ include/ap_mmn.h	(working copy)
@@ -673,7 +673,7 @@
  *                         ap_proxy_tunnel_conn_get_transferred() change
  *                         ap_proxy_transfer_between_connections() sent to apr_off_t *.
  * 20210531.0 (2.5.1-dev)  add conn_rec->outgoing and ap_ssl_bind_outgoing()
- * 20210531.1 (2.5.1-dev)  Add ap_bucket_type_wc, ap_bucket_wc_make() and
+ * 20210531.1 (2.5.1-dev)  Add ap_bucket_wc_data, ap_bucket_wc_make() and
  *                         ap_bucket_wc_create() to util_filter.h
  * 20210531.2 (2.5.1-dev)  Add ap_proxy_get_worker_ex() and
  *                         ap_proxy_define_worker_ex() to mod_proxy.h
Index: include/util_filter.h
===================================================================
--- include/util_filter.h	(revision 1892424)
+++ include/util_filter.h	(working copy)
@@ -763,15 +763,31 @@ AP_DECLARE(void) ap_filter_protocol(ap_filter_t* f
 /** Filter is incompatible with "Cache-Control: no-transform" */
 #define AP_FILTER_PROTO_TRANSFORM 0x20
 
-/** Write Completion (WC) bucket */
-AP_DECLARE_DATA extern const apr_bucket_type_t ap_bucket_type_wc;
+/**
+ * @brief Write Completion (WC) bucket
+ *
+ * A WC bucket is a FLUSH bucket with special ->data == &ap_bucket_wc_data,
+ * still both AP_BUCKET_IS_WC() and APR_BUCKET_IS_FLUSH() hold for them so
+ * they have the same semantics for most filters, namely:
+ *   Everything produced before shall be passed to the next filter, including
+ *   the WC/FLUSH bucket itself.
+ * The distinction between WC and FLUSH buckets is only for filters that care
+ * about write completion (calling ap_filter_reinstate_brigade() with non-NULL
+ * flush_upto), those can setaside WC buckets and the preceding data provided
+ * they have first determined that the next filter(s) have pending data
+ * already, usually by calling ap_filter_should_yield(f->next).
+ */
 
+/** Write Completion (WC) bucket data mark */
+AP_DECLARE_DATA extern const char ap_bucket_wc_data;
+
 /**
  * Determine if a bucket is a Write Completion (WC) bucket
  * @param e The bucket to inspect
  * @return true or false
  */
-#define AP_BUCKET_IS_WC(e) ((e)->type == &ap_bucket_type_wc)
+#define AP_BUCKET_IS_WC(e) (APR_BUCKET_IS_FLUSH(e) && \
+                            (e)->data == (void *)&ap_bucket_wc_data)
 
 /**
  * Make the bucket passed in a Write Completion (WC) bucket
Index: server/util_filter.c
===================================================================
--- server/util_filter.c	(revision 1892424)
+++ server/util_filter.c	(working copy)
@@ -976,7 +976,9 @@ AP_DECLARE(apr_status_t) ap_filter_setaside_brigad
              e = next) {
             next = APR_BUCKET_NEXT(e);
 
-            /* Strip WC buckets added by ap_filter_output_pending(). */
+            /* WC buckets will be added back by ap_filter_output_pending()
+             * at the tail.
+             */
             if (AP_BUCKET_IS_WC(e)) {
                 apr_bucket_delete(e);
                 continue;
@@ -1069,6 +1071,7 @@ AP_DECLARE(apr_status_t) ap_filter_reinstate_briga
     int eor_buckets_in_brigade, opaque_buckets_in_brigade;
     struct ap_filter_private *fp = f->priv;
     core_server_config *conf;
+    int is_flush;
  
     ap_log_cerror(APLOG_MARK, APLOG_TRACE6, 0, f->c,
                   "reinstate %s brigade to %s brigade in '%s' %sput filter",
@@ -1132,7 +1135,15 @@ AP_DECLARE(apr_status_t) ap_filter_reinstate_briga
          bucket = next) {
         next = APR_BUCKET_NEXT(bucket);
 
-        if (AP_BUCKET_IS_EOR(bucket)) {
+        /* When called with flush_upto != NULL, we assume that the caller does
+         * the right thing to potentially setaside WC buckets (per semantics),
+         * so we don't treat them as FLUSH(_upto) here.
+         */
+        is_flush = (APR_BUCKET_IS_FLUSH(bucket) && !AP_BUCKET_IS_WC(bucket));
+        if (is_flush) {
+            /* handled below */
+        }
+        else if (AP_BUCKET_IS_EOR(bucket)) {
             eor_buckets_in_brigade++;
         }
         else if (bucket->length == (apr_size_t)-1) {
@@ -1145,7 +1156,7 @@ AP_DECLARE(apr_status_t) ap_filter_reinstate_briga
             }
         }
 
-        if (APR_BUCKET_IS_FLUSH(bucket)
+        if (is_flush
             || (memory_bytes_in_brigade > conf->flush_max_threshold)
             || (conf->flush_max_pipelined >= 0
                 && eor_buckets_in_brigade > conf->flush_max_pipelined)) {
@@ -1152,7 +1163,7 @@ AP_DECLARE(apr_status_t) ap_filter_reinstate_briga
             /* this segment of the brigade MUST be sent before returning. */
 
             if (APLOGctrace6(f->c)) {
-                char *reason = APR_BUCKET_IS_FLUSH(bucket) ?
+                char *reason = is_flush ?
                                "FLUSH bucket" :
                                (memory_bytes_in_brigade > conf->flush_max_threshold) ?
                                "max threshold" : "max requests in pipeline";
@@ -1400,22 +1411,15 @@ AP_DECLARE(void) ap_filter_protocol(ap_filter_t *f
     f->frec->proto_flags = flags ;
 }
 
+/* Write Completion (WC) bucket implementation */
 
-static apr_status_t wc_bucket_read(apr_bucket *b, const char **str,
-                                   apr_size_t *len, apr_read_type_e block)
-{
-    *str = NULL;
-    *len = 0;
-    return APR_SUCCESS;
-}
+AP_DECLARE_DATA const char ap_bucket_wc_data;
 
 AP_DECLARE(apr_bucket *) ap_bucket_wc_make(apr_bucket *b)
 {
-    b->length = 0;
-    b->start  = 0;
-    b->data   = NULL;
-    b->type   = &ap_bucket_type_wc;
-
+    /* FLUSH bucket with special ->data mark (instead of NULL) */
+    b = apr_bucket_flush_make(b);
+    b->data = (void *)&ap_bucket_wc_data;
     return b;
 }
 
@@ -1428,12 +1432,3 @@ AP_DECLARE(apr_bucket *) ap_bucket_wc_create(apr_b
     b->list = list;
     return ap_bucket_wc_make(b);
 }
-
-AP_DECLARE_DATA const apr_bucket_type_t ap_bucket_type_wc = {
-    "WC", 5, APR_BUCKET_METADATA,
-    apr_bucket_destroy_noop,
-    wc_bucket_read,
-    apr_bucket_setaside_noop,
-    apr_bucket_split_notimpl,
-    apr_bucket_simple_copy
-};
Index: modules/proxy/proxy_util.c
===================================================================
--- modules/proxy/proxy_util.c	(revision 1892424)
+++ modules/proxy/proxy_util.c	(working copy)
@@ -4551,6 +4551,7 @@ PROXY_DECLARE(apr_status_t) ap_proxy_transfer_betw
             APR_BRIGADE_INSERT_TAIL(bb_o, b);
         }
         else {
+            /* Prevent setaside/coalescing by intermediate filters. */
             b = ap_bucket_wc_create(bb_o->bucket_alloc);
             APR_BRIGADE_INSERT_TAIL(bb_o, b);
         }

Reply via email to