Index: server/Makefile.in
===================================================================
--- server/Makefile.in	(revision 1543235)
+++ server/Makefile.in	(working copy)
@@ -13,7 +13,7 @@
 	util_charset.c util_cookies.c util_debug.c util_xml.c \
 	util_filter.c util_pcre.c util_regex.c exports.c \
 	scoreboard.c error_bucket.c protocol.c core.c request.c provider.c \
-	eoc_bucket.c eor_bucket.c core_filters.c \
+	eoc_bucket.c eor_bucket.c nonblock_bucket.c core_filters.c \
 	util_expr_parse.c util_expr_scan.c util_expr_eval.c \
 	apreq_cookie.c apreq_error.c apreq_module.c \
 	apreq_module_cgi.c apreq_module_custom.c apreq_param.c \
Index: server/core.c
===================================================================
--- server/core.c	(revision 1543235)
+++ server/core.c	(working copy)
@@ -112,6 +112,7 @@
 
 /* Handles for core filters */
 AP_DECLARE_DATA ap_filter_rec_t *ap_subreq_core_filter_handle;
+AP_DECLARE_DATA ap_filter_rec_t *ap_core_helper_filter_handle;
 AP_DECLARE_DATA ap_filter_rec_t *ap_core_output_filter_handle;
 AP_DECLARE_DATA ap_filter_rec_t *ap_content_length_filter_handle;
 AP_DECLARE_DATA ap_filter_rec_t *ap_core_input_filter_handle;
@@ -4880,6 +4881,7 @@
 
     ap_set_core_module_config(net->c->conn_config, csd);
     ap_add_input_filter_handle(ap_core_input_filter_handle, net, NULL, net->c);
+    ap_add_output_filter_handle(ap_core_helper_filter_handle, net, NULL, net->c);
     ap_add_output_filter_handle(ap_core_output_filter_handle, net, NULL, net->c);
     return DONE;
 }
@@ -5112,6 +5114,9 @@
     ap_content_length_filter_handle =
         ap_register_output_filter("CONTENT_LENGTH", ap_content_length_filter,
                                   NULL, AP_FTYPE_PROTOCOL);
+    ap_core_helper_filter_handle =
+        ap_register_output_filter("CORE_HELPER", ap_core_helper_filter,
+                                  NULL, AP_FTYPE_CONNECTION);
     ap_core_output_filter_handle =
         ap_register_output_filter("CORE", ap_core_output_filter,
                                   NULL, AP_FTYPE_NETWORK);
Index: server/core_filters.c
===================================================================
--- server/core_filters.c	(revision 1543235)
+++ server/core_filters.c	(working copy)
@@ -83,6 +83,7 @@
     apr_bucket_brigade *tmp_flush_bb;
     apr_pool_t *deferred_write_pool;
     apr_size_t bytes_written;
+    int reached_core:1;
 };
 
 struct core_filter_ctx {
@@ -369,23 +370,21 @@
  */
 extern APR_OPTIONAL_FN_TYPE(ap_logio_add_bytes_out) *ap__logio_add_bytes_out;
 
-apr_status_t ap_core_output_filter(ap_filter_t *f, apr_bucket_brigade *new_bb)
+apr_status_t ap_core_output_filter(ap_filter_t *f, apr_bucket_brigade *bb)
 {
     conn_rec *c = f->c;
     core_net_rec *net = f->ctx;
     core_output_filter_ctx_t *ctx = net->out_ctx;
-    apr_bucket_brigade *bb = NULL;
     apr_bucket *bucket, *next, *flush_upto = NULL;
     apr_size_t bytes_in_brigade, non_file_bytes_in_brigade;
-    int eor_buckets_in_brigade, morphing_bucket_in_brigade;
+    int eor_buckets_in_brigade, morphing_bucket_in_brigade,
+        nonblock_bucket_in_brigade;
     apr_status_t rv;
     int loglevel = ap_get_conn_module_loglevel(c, APLOG_MODULE_INDEX);
 
     /* Fail quickly if the connection has already been aborted. */
     if (c->aborted) {
-        if (new_bb != NULL) {
-            apr_brigade_cleanup(new_bb);
-        }
+        apr_brigade_cleanup(bb);
         return APR_ECONNABORTED;
     }
 
@@ -402,32 +401,30 @@
         ctx->buffered_bb = apr_brigade_create(c->pool, c->bucket_alloc);
     }
 
-    if (new_bb != NULL)
-        bb = new_bb;
+    /* we reached the core, leave a cookie */
+    ctx->reached_core = 1;
 
-    if ((ctx->buffered_bb != NULL) &&
-        !APR_BRIGADE_EMPTY(ctx->buffered_bb)) {
-        if (new_bb != NULL) {
-            APR_BRIGADE_PREPEND(bb, ctx->buffered_bb);
+    if (!APR_BRIGADE_EMPTY(ctx->buffered_bb)) {
+        APR_BRIGADE_PREPEND(bb, ctx->buffered_bb);
+        c->data_in_output_filters--;
+        if (loglevel >= APLOG_TRACE8) {
+            ap_log_cerror(
+                    APLOG_MARK, APLOG_TRACE8, APR_SUCCESS, f->c,
+                    "core_output_filter: buffer emptied, data in the "
+                    "output filters: %d", f->c->data_in_output_filters);
         }
-        else {
-            bb = ctx->buffered_bb;
-        }
-        c->data_in_output_filters = 0;
     }
-    else if (new_bb == NULL) {
-        return APR_SUCCESS;
-    }
 
     /* Scan through the brigade and decide whether to attempt a write,
      * and how much to write, based on the following rules:
      *
-     *  1) The new_bb is null: Do a nonblocking write of as much as
-     *     possible: do a nonblocking write of as much data as possible,
-     *     then save the rest in ctx->buffered_bb.  (If new_bb == NULL,
-     *     it probably means that the MPM is doing asynchronous write
-     *     completion and has just determined that this connection
-     *     is writable.)
+     *  1) The request contains a NONBLOCK bucket, or we are in the
+     *     CONN_STATE_WRITE_COMPLETION state, do a nonblocking write
+     *     of as much data as possible, then save the rest in
+     *     ctx->buffered_bb. If we are done, set c->data_in_output_filters
+     *     to zero to signal to the core that write completion is over.
+     *     When a flush and nonblock bucket exist in the same brigade, the
+     *     nonblock bucket wins.
      *
      *  2) Determine if and up to which bucket we need to do a blocking
      *     write:
@@ -466,26 +463,11 @@
      *     then save the rest in ctx->buffered_bb.
      */
 
-    if (new_bb == NULL) {
-        rv = send_brigade_nonblocking(net->client_socket, bb,
-                                      &(ctx->bytes_written), c);
-        if (APR_STATUS_IS_EAGAIN(rv)) {
-            rv = APR_SUCCESS;
-        }
-        else if (rv != APR_SUCCESS) {
-            /* The client has aborted the connection */
-            ap_log_cerror(APLOG_MARK, APLOG_TRACE1, rv, c,
-                          "core_output_filter: writing data to the network");
-            c->aborted = 1;
-        }
-        setaside_remaining_output(f, ctx, bb, c);
-        return rv;
-    }
-
     bytes_in_brigade = 0;
     non_file_bytes_in_brigade = 0;
     eor_buckets_in_brigade = 0;
     morphing_bucket_in_brigade = 0;
+    nonblock_bucket_in_brigade = 0;
 
     for (bucket = APR_BRIGADE_FIRST(bb); bucket != APR_BRIGADE_SENTINEL(bb);
          bucket = next) {
@@ -509,11 +491,16 @@
         else if (AP_BUCKET_IS_EOR(bucket)) {
             eor_buckets_in_brigade++;
         }
+        else if (AP_BUCKET_IS_NONBLOCK(bucket)) {
+            nonblock_bucket_in_brigade++;
+        }
 
-        if (APR_BUCKET_IS_FLUSH(bucket)
-            || non_file_bytes_in_brigade >= THRESHOLD_MAX_BUFFER
-            || morphing_bucket_in_brigade
-            || eor_buckets_in_brigade > MAX_REQUESTS_IN_PIPELINE) {
+        if (!nonblock_bucket_in_brigade
+                && !(c->cs && c->cs->state == CONN_STATE_WRITE_COMPLETION)
+                && (APR_BUCKET_IS_FLUSH(bucket)
+                        || non_file_bytes_in_brigade >= THRESHOLD_MAX_BUFFER
+                        || morphing_bucket_in_brigade
+                        || eor_buckets_in_brigade > MAX_REQUESTS_IN_PIPELINE)) {
             /* this segment of the brigade MUST be sent before returning. */
 
             if (loglevel >= APLOG_TRACE6) {
@@ -548,12 +535,12 @@
         }
     }
 
+    /* Handle any flush buckets we may have found */
     if (flush_upto != NULL) {
         ctx->tmp_flush_bb = apr_brigade_split_ex(bb, flush_upto,
                                                  ctx->tmp_flush_bb);
         if (loglevel >= APLOG_TRACE8) {
-                ap_log_cerror(APLOG_MARK, APLOG_TRACE8, 0, c,
-                              "flushing now");
+            ap_log_cerror(APLOG_MARK, APLOG_TRACE8, 0, c, "flushing now");
         }
         rv = send_brigade_blocking(net->client_socket, bb,
                                    &(ctx->bytes_written), c);
@@ -581,6 +568,33 @@
                       eor_buckets_in_brigade, morphing_bucket_in_brigade);
     }
 
+    /* Handle non blocking writes. If we saw a non blocking bucket, attempt
+     * a non blocking write. If the non blocking write would have returned
+     * APR_EGAIN, set aside the remainder and return APR_EAGAIN.
+     */
+    if (nonblock_bucket_in_brigade) {
+        if (loglevel >= APLOG_TRACE6) {
+            ap_log_cerror(APLOG_MARK, APLOG_TRACE8, APR_SUCCESS, c,
+                          "core_output_filter: non blocking write "
+                          "to the network");
+        }
+        rv = send_brigade_nonblocking(net->client_socket, bb,
+                                      &(ctx->bytes_written), c);
+        if (APR_STATUS_IS_EAGAIN(rv)) {
+            setaside_remaining_output(f, ctx, bb, c);
+        }
+        else if (rv != APR_SUCCESS) {
+            /* The client has aborted the connection */
+            ap_log_cerror(APLOG_MARK, APLOG_TRACE1, rv, c,
+                          "core_output_filter: writing data to the network");
+            c->aborted = 1;
+        }
+        return rv;
+    }
+
+    /* Otherwise handle a normal write. A non blocking write will be attempted,
+     * but we don't return APR_EAGAIN, we return APR_SUCCESS instead.
+     */
     if (bytes_in_brigade >= THRESHOLD_MIN_WRITE) {
         rv = send_brigade_nonblocking(net->client_socket, bb,
                                       &(ctx->bytes_written), c);
@@ -617,7 +631,11 @@
     }
     remove_empty_buckets(bb);
     if (!APR_BRIGADE_EMPTY(bb)) {
-        c->data_in_output_filters = 1;
+        c->data_in_output_filters++;
+        ap_log_cerror(
+                APLOG_MARK, APLOG_TRACE8, APR_SUCCESS, f->c,
+                "core_output_filter: setting aside buffered data, data in the "
+                "output filters: %d", f->c->data_in_output_filters);
         if (bb != ctx->buffered_bb) {
             if (!ctx->deferred_write_pool) {
                 apr_pool_create(&ctx->deferred_write_pool, c->pool);
@@ -927,4 +945,49 @@
     return rv;
 }
 
+apr_status_t ap_core_helper_filter(ap_filter_t *f, apr_bucket_brigade *bb)
+{
+    conn_rec *c = f->c;
+    core_net_rec *net = f->ctx;
+    core_output_filter_ctx_t *ctx = net->out_ctx;
+    apr_status_t rv;
+
+    if (ctx) {
+        ctx->reached_core = 0;
+    }
+
+    rv = ap_pass_brigade(f->next, bb);
+
+    /* did we reach the core? compensate if not */
+    if (APR_SUCCESS == rv && (!ctx || !ctx->reached_core) && c->data_in_output_filters) {
+        ap_filter_t *output_filter = c->output_filters;
+        apr_bucket *bucket;
+
+        /* At this point we tried a write to the filter stack above, but a
+         * buggy filter that swallows metadata buckets caused us to not reach
+         * the core. Compensate by calling the core output filter directly,
+         * giving it an opportunity to do the write we were trying to do.
+         *
+         * This compensation code should be removed in v2.6.
+         */
+
+        ap_log_cerror(
+                APLOG_MARK, APLOG_TRACE6, APR_SUCCESS, f->c,
+                "core_helper_filter: compensating for buggy filter that "
+                "didn't pass a metadata bucket");
+
+        apr_brigade_cleanup(bb);
+        bucket = ap_bucket_nonblock_create(f->c->bucket_alloc);
+        APR_BRIGADE_INSERT_TAIL(bb, bucket);
+
+        /* find the core output filter, call it directly */
+        while (output_filter->next != NULL) {
+            output_filter = output_filter->next;
+        }
+        rv = output_filter->frec->filter_func.out_func(output_filter, bb);
+    }
+
+    return rv;
+}
+
 #endif
Index: server/mpm/event/event.c
===================================================================
--- server/mpm/event/event.c	(revision 1543235)
+++ server/mpm/event/event.c	(working copy)
@@ -201,6 +201,8 @@
     apr_pool_t *p;
     /** bucket allocator */
     apr_bucket_alloc_t *bucket_alloc;
+    /** empty brigade for write completion */
+    apr_bucket_brigade *bb;
     /** poll file descriptor information */
     apr_pollfd_t pfd;
     /** public parts of the connection state */
@@ -935,6 +937,7 @@
         apr_atomic_inc32(&connection_count);
         apr_pool_cleanup_register(c->pool, cs, decrement_connection_count,
                                   apr_pool_cleanup_null);
+        cs->bb = apr_brigade_create(p, cs->bucket_alloc);
         c->current_thread = thd;
         cs->c = c;
         c->cs = &(cs->pub);
@@ -1010,19 +1013,19 @@
     }
 
     if (cs->pub.state == CONN_STATE_WRITE_COMPLETION) {
-        ap_filter_t *output_filter = c->output_filters;
+        /* pass a nonblock bucket into the output filters */
         apr_status_t rv;
+        apr_bucket *bucket;
         ap_update_child_status_from_conn(sbh, SERVER_BUSY_WRITE, c);
-        while (output_filter->next != NULL) {
-            output_filter = output_filter->next;
-        }
-        rv = output_filter->frec->filter_func.out_func(output_filter, NULL);
-        if (rv != APR_SUCCESS) {
+        bucket = ap_bucket_nonblock_create(c->bucket_alloc);
+        APR_BRIGADE_INSERT_TAIL(cs->bb, bucket);
+        rv = ap_pass_brigade(c->output_filters, cs->bb);
+        if (rv != APR_SUCCESS && rv != APR_EAGAIN) {
             ap_log_cerror(APLOG_MARK, APLOG_DEBUG, rv, c, APLOGNO(00470)
                           "network write failure in core output filter");
             cs->pub.state = CONN_STATE_LINGER;
         }
-        else if (c->data_in_output_filters) {
+        else if (c->data_in_output_filters || rv == APR_EAGAIN) {
             /* Still in WRITE_COMPLETION_STATE:
              * Set a write timeout for this connection, and let the
              * event thread poll for writeability.
Index: server/mpm/eventopt/eventopt.c
===================================================================
--- server/mpm/eventopt/eventopt.c	(revision 1543235)
+++ server/mpm/eventopt/eventopt.c	(working copy)
@@ -204,6 +204,8 @@
     apr_pool_t *p;
     /** bucket allocator */
     apr_bucket_alloc_t *bucket_alloc;
+    /** empty brigade for write completion */
+    apr_bucket_brigade *bb;
     /** poll file descriptor information */
     apr_pollfd_t pfd;
     /** public parts of the connection state */
@@ -986,6 +988,7 @@
         apr_atomic_inc32(&connection_count);
         apr_pool_cleanup_register(c->pool, cs, decrement_connection_count,
                                   apr_pool_cleanup_null);
+        cs->bb = apr_brigade_create(p, cs->bucket_alloc);
         c->current_thread = thd;
         cs->c = c;
         c->cs = &(cs->pub);
@@ -1060,19 +1063,19 @@
     }
 
     if (cs->pub.state == CONN_STATE_WRITE_COMPLETION) {
-        ap_filter_t *output_filter = c->output_filters;
+        /* pass a nonblock bucket into the output filters */
         apr_status_t rv;
+        apr_bucket *nonblock;
         ap_update_child_status_from_conn(sbh, SERVER_BUSY_WRITE, c);
-        while (output_filter->next != NULL) {
-            output_filter = output_filter->next;
-        }
-        rv = output_filter->frec->filter_func.out_func(output_filter, NULL);
-        if (rv != APR_SUCCESS) {
+        nonblock = ap_bucket_nonblock_create(c->bucket_alloc);
+        APR_BRIGADE_INSERT_TAIL(cs->bb, nonblock);
+        rv = ap_pass_brigade(c->output_filters, cs->bb);
+        if (rv != APR_SUCCESS && rv != APR_EAGAIN) {
             ap_log_cerror(APLOG_MARK, APLOG_DEBUG, rv, c, APLOGNO(00470)
                           "network write failure in core output filter");
             cs->pub.state = CONN_STATE_LINGER;
         }
-        else if (c->data_in_output_filters) {
+        else if (c->data_in_output_filters || rv == APR_EAGAIN) {
             /* Still in WRITE_COMPLETION_STATE:
              * Set a write timeout for this connection, and let the
              * event thread poll for writeability.
Index: server/mpm/simple/simple_io.c
===================================================================
--- server/mpm/simple/simple_io.c	(revision 1543235)
+++ server/mpm/simple/simple_io.c	(working copy)
@@ -26,6 +26,7 @@
 #include "http_main.h"
 #include "scoreboard.h"
 #include "http_vhost.h"
+#include "mpm_common.h"
 
 APLOG_USE_MODULE(mpm_simple);
 
@@ -92,20 +93,19 @@
         }
 
         if (scon->cs.state == CONN_STATE_WRITE_COMPLETION) {
-            ap_filter_t *output_filter = c->output_filters;
-            while (output_filter->next != NULL) {
-                output_filter = output_filter->next;
-            }
+            /* pass a nonblock bucket into the output filters */
+            apr_bucket *nonblock;
 
-            rv = output_filter->frec->filter_func.out_func(output_filter,
-                                                           NULL);
+            nonblock = ap_bucket_nonblock_create(c->bucket_alloc);
+            APR_BRIGADE_INSERT_TAIL(scon->bb, nonblock);
+            rv = ap_pass_brigade(c->output_filters, scon->bb);
 
-            if (rv != APR_SUCCESS) {
+            if (rv != APR_SUCCESS && rv != APR_EAGAIN) {
                 ap_log_error(APLOG_MARK, APLOG_WARNING, rv, ap_server_conf, APLOGNO(00249)
                              "network write failure in core output filter");
                 scon->cs.state = CONN_STATE_LINGER;
             }
-            else if (c->data_in_output_filters) {
+            else if (c->data_in_output_filters || rv == APR_EAGAIN) {
                 /* Still in WRITE_COMPLETION_STATE:
                  * Set a write timeout for this connection, and let the
                  * event thread poll for writeability.
@@ -217,6 +217,8 @@
                                        conn_id, sbh, scon->ba);
     /* XXX: handle failure */
 
+    scon->bb = apr_brigade_create(scon->pool, scon->ba);
+
     scon->c->cs = &scon->cs;
     sb = apr_pcalloc(scon->pool, sizeof(simple_sb_t));
 
Index: server/mpm/simple/simple_types.h
===================================================================
--- server/mpm/simple/simple_types.h	(revision 1543235)
+++ server/mpm/simple/simple_types.h	(working copy)
@@ -122,6 +122,8 @@
     apr_socket_t *sock;
     apr_bucket_alloc_t *ba;
     conn_rec *c;
+    /** empty brigade for write completion */
+    apr_bucket_brigade *bb;
     /** poll file descriptor information */
     apr_pollfd_t pfd;
     /** public parts of the connection state */
Index: server/nonblock_bucket.c
===================================================================
--- server/nonblock_bucket.c	(revision 0)
+++ server/nonblock_bucket.c	(working copy)
@@ -0,0 +1,55 @@
+/* Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#include "httpd.h"
+#include "mpm_common.h"
+
+static apr_status_t nonblock_bucket_read(apr_bucket *b, const char **str,
+                                    apr_size_t *len, apr_read_type_e block)
+{
+    *str = NULL;
+    *len = 0;
+    return APR_SUCCESS;
+}
+
+AP_DECLARE(apr_bucket *) ap_bucket_nonblock_make(apr_bucket *b)
+{
+    b->length      = 0;
+    b->start       = 0;
+    b->data        = NULL;
+    b->type        = &ap_bucket_type_nonblock;
+
+    return b;
+}
+
+AP_DECLARE(apr_bucket *) ap_bucket_nonblock_create(apr_bucket_alloc_t *list)
+{
+    apr_bucket *b = apr_bucket_alloc(sizeof(*b), list);
+
+    APR_BUCKET_INIT(b);
+    b->free = apr_bucket_free;
+    b->list = list;
+    return ap_bucket_nonblock_make(b);
+}
+
+AP_DECLARE_DATA const apr_bucket_type_t ap_bucket_type_nonblock = {
+    "NONBLOCK", 5, APR_BUCKET_METADATA,
+    apr_bucket_destroy_noop,
+    nonblock_bucket_read,
+    apr_bucket_setaside_noop,
+    apr_bucket_split_notimpl,
+    apr_bucket_simple_copy
+};
Index: include/http_core.h
===================================================================
--- include/http_core.h	(revision 1543235)
+++ include/http_core.h	(working copy)
@@ -697,9 +697,9 @@
 apr_status_t ap_core_input_filter(ap_filter_t *f, apr_bucket_brigade *b,
                                   ap_input_mode_t mode, apr_read_type_e block,
                                   apr_off_t readbytes);
+apr_status_t ap_core_helper_filter(ap_filter_t *f, apr_bucket_brigade *b);
 apr_status_t ap_core_output_filter(ap_filter_t *f, apr_bucket_brigade *b);
 
-
 AP_DECLARE(const char*) ap_get_server_protocol(server_rec* s);
 AP_DECLARE(void) ap_set_server_protocol(server_rec* s, const char* proto);
 
Index: include/mpm_common.h
===================================================================
--- include/mpm_common.h	(revision 1543235)
+++ include/mpm_common.h	(working copy)
@@ -429,6 +429,33 @@
  */
 void mpm_common_pre_config(apr_pool_t *pconf);
 
+/** Non Blocking (NONBLOCK) bucket */
+AP_DECLARE_DATA extern const apr_bucket_type_t ap_bucket_type_nonblock;
+
+/**
+ * Determine if a bucket is an Non Blocking (NONBLOCK) bucket
+ * @param e The bucket to inspect
+ * @return true or false
+ */
+#define AP_BUCKET_IS_NONBLOCK(e)         (e->type == &ap_bucket_type_nonblock)
+
+/**
+ * Make the bucket passed in a Non Blocking (NONBLOCK) bucket
+ * @param b The bucket to make into an NONBLOCK bucket
+ * @return The new bucket, or NULL if allocation failed
+ */
+AP_DECLARE(apr_bucket *) ap_bucket_nonblock_make(apr_bucket *b);
+
+/**
+ * Create a bucket indicating that subsequent writes on the brigade should
+ * be Non Blocking (NONBLOCK). This indicates that only some of the data
+ * in the brigade need be written, the rest can be setaside and written in
+ * a future call.
+ * @param list The freelist from which this bucket should be allocated
+ * @return The new bucket, or NULL if allocation failed
+ */
+AP_DECLARE(apr_bucket *) ap_bucket_nonblock_create(apr_bucket_alloc_t *list);
+
 #ifdef __cplusplus
 }
 #endif
