This patch makes the code for talking to the back end fastcgi process
use apr_poll, interleaving reads and writes as they become ready. 
Note that it doesn't actually switch to nonblocking reads/writes, but
that can be done in another pass.  This also switches to AP_IOBUFSIZE
for the read/write buffers as suggested by Justin, and more
importantly adds support for the FCGI_STDOUT so we can actually send
data from the back end fastcgi process to the front end client.

Log follows, patch attached.  Comments, as always, are more than welcome.

-garrett

Add support for reading FCGI_STDOUT and FCGI_END_REQUEST records from the
back end fastcgi process.  This includes switching to a poll based dispatch
loop that handles interleaved reads and writes.

* modules/proxy/mod_proxy_fcgi.c
  (MAX_INPUT_BYTES): Removed, we now use AP_IOBUFSIZE.
  (send_stdin): Removed, code incorporated into dispatch routine.
  (dispatch): New, poll based dispatch loop that handles both reads and
   writes.
  (fcgi_do_request): Call new dispatch routine.  Return OK if we get through
   without errors.
Index: modules/proxy/mod_proxy_fcgi.c
===================================================================
--- modules/proxy/mod_proxy_fcgi.c	(revision 359241)
+++ modules/proxy/mod_proxy_fcgi.c	(working copy)
@@ -304,83 +304,213 @@
     return apr_socket_sendv(conn->sock, vec, 1, &len);
 }
 
-/*
- * An arbitrary buffer size for reading the stdin data from the client.
- *
- * Need to find a "better" value here, or at least justify the current
- * value somehow.
- */
-#define MAX_INPUT_BYTES 1024
-
-static apr_status_t send_stdin(proxy_conn_rec *conn, request_rec *r,
-                               int request_id)
+static apr_status_t dispatch(proxy_conn_rec *conn, request_rec *r,
+                             int request_id)
 {
-    apr_bucket_brigade *input_brigade;
+    apr_bucket_brigade *input_brigade, *output_brigade;
     apr_status_t rv = APR_SUCCESS;
+    char readbuf[AP_IOBUFSIZE];
+    apr_size_t readbuflen;
     struct iovec vec[2];
     fcgi_header header;
+    apr_pollfd_t pfd;
     int done = 0;
 
     fill_in_header(&header, FCGI_STDIN, request_id);
 
+    pfd.desc_type = APR_POLL_SOCKET;
+    pfd.desc.s = conn->sock;
+    pfd.p = r->pool;
+    pfd.reqevents = APR_POLLIN | APR_POLLOUT;
+
     input_brigade = apr_brigade_create(r->pool, r->connection->bucket_alloc);
+    output_brigade = apr_brigade_create(r->pool, r->connection->bucket_alloc);
 
     while (! done) {
-        char buff[MAX_INPUT_BYTES];
-        apr_size_t buflen, len;
+        apr_size_t len;
+        int n;
 
-        rv = ap_get_brigade(r->input_filters, input_brigade,
-                            AP_MODE_READBYTES, APR_BLOCK_READ,
-                            MAX_INPUT_BYTES);
+        rv = apr_poll(&pfd, 1, &n, -1);
         if (rv != APR_SUCCESS) {
             break;
         }
 
-        if (APR_BUCKET_IS_EOS(APR_BRIGADE_LAST(input_brigade))) {
-            done = 1;
-        }
+        if (pfd.rtnevents & APR_POLLOUT) {
+            char writebuf[AP_IOBUFSIZE];
+            apr_size_t writebuflen;
+            int last_stdin = 0;
 
-        buflen = sizeof(buff);
+            rv = ap_get_brigade(r->input_filters, input_brigade,
+                                AP_MODE_READBYTES, APR_BLOCK_READ,
+                                AP_IOBUFSIZE);
+            if (rv != APR_SUCCESS) {
+                break;
+            }
 
-        rv = apr_brigade_flatten(input_brigade, buff, &buflen);
+            if (APR_BUCKET_IS_EOS(APR_BRIGADE_LAST(input_brigade))) {
+                last_stdin = 1;
+            }
 
-        apr_brigade_cleanup(input_brigade);
+            writebuflen = sizeof(writebuf);
 
-        if (rv != APR_SUCCESS) {
-            break;
-        }
+            rv = apr_brigade_flatten(input_brigade, writebuf, &writebuflen);
 
-        header.contentLengthB1 = ((buflen >> 8) & 0xff);
-        header.contentLengthB0 = ((buflen) & 0xff); 
+            apr_brigade_cleanup(input_brigade);
 
-        vec[0].iov_base = &header;
-        vec[0].iov_len = sizeof(header);
-        vec[1].iov_base = buff;
-        vec[1].iov_len = buflen;
+            if (rv != APR_SUCCESS) {
+                break;
+            }
 
-        rv = apr_socket_sendv(conn->sock, vec, 2, &len);
-        if (rv != APR_SUCCESS) {
-            break;
+            header.contentLengthB1 = ((writebuflen >> 8) & 0xff);
+            header.contentLengthB0 = ((writebuflen) & 0xff); 
+
+            vec[0].iov_base = &header;
+            vec[0].iov_len = sizeof(header);
+            vec[1].iov_base = writebuf;
+            vec[1].iov_len = writebuflen;
+
+            /* XXX This should be nonblocking, and if we don't write all
+             *     the data we need to keep track of that fact so we can
+             *     get to it next time through. */
+            rv = apr_socket_sendv(conn->sock, vec, 2, &len);
+            if (rv != APR_SUCCESS) {
+                break;
+            }
+
+            /* XXX AJP updates conn->worker->s->transferred here, do we need
+             *     to? */
+
+            if (last_stdin) {
+                pfd.reqevents = APR_POLLIN; /* Done with input data */
+
+                header.contentLengthB1 = 0;
+                header.contentLengthB0 = 0;
+
+                vec[0].iov_base = &header;
+                vec[0].iov_len = sizeof(header);
+
+                rv = apr_socket_sendv(conn->sock, vec, 1, &len);
+            }
         }
 
-        /* XXX AJP updates conn->worker->s->transferred here, do we need to? */
-    }
+        if (pfd.rtnevents & APR_POLLIN) {
+            apr_size_t clen = 0;
+            int rid, type = 0;
+            char plen = 0;
 
-    /* If we got here successfully it means we sent all the data, so we need
-     * to send the final empty record to signify the end of the stream. */
-    if (rv == APR_SUCCESS) {
-        apr_size_t len;
+            /* First, we grab the header... */
+            readbuflen = 8;
 
-        header.contentLengthB1 = 0;
-        header.contentLengthB0 = 0;
+            rv = apr_socket_recv(conn->sock, readbuf, &readbuflen);
+            if (rv != APR_SUCCESS) {
+                break;
+            }
 
-        vec[0].iov_base = &header;
-        vec[0].iov_len = sizeof(header);
+            if (readbuflen != 8) {
+                ap_log_error(APLOG_MARK, APLOG_ERR, 0, r->server,
+                             "proxy: FCGI: Failed to read entire header");
+                rv = APR_EINVAL;
+                break;
+            }
 
-        rv = apr_socket_sendv(conn->sock, vec, 1, &len);
+            if (readbuf[0] != FCGI_VERSION) {
+                ap_log_error(APLOG_MARK, APLOG_ERR, 0, r->server,
+                             "proxy: FCGI: Got bogus version %d",
+                             (int) readbuf[0]);
+                rv = APR_EINVAL;
+                break;
+            }
+
+            type = readbuf[1];
+
+            rid |= readbuf[2] << 8;
+            rid |= readbuf[3] << 0;
+
+            if (rid != request_id) {
+                ap_log_error(APLOG_MARK, APLOG_ERR, 0, r->server,
+                             "proxy: FCGI: Got bogus rid %d, expected %d",
+                             rid, request_id);
+                rv = APR_EINVAL;
+                break;
+            }
+
+            clen |= readbuf[4] << 8;
+            clen |= readbuf[5] << 0;
+
+            plen = readbuf[6];
+
+            /* XXX We need support for content length > buffer size, but for
+             *     now just punt. */
+            if ((clen + plen) > sizeof(readbuf)) {
+                ap_log_error(APLOG_MARK, APLOG_ERR, 0, r->server,
+                             "proxy: FCGI: back end server send more data "
+                             "than fits in buffer");
+                rv = APR_EINVAL;
+                break;
+            }
+
+            /* Now get the actual data.  Yes it sucks to do this in a second
+             * recv call, this will eventually change when we move to real
+             * nonblocking recv calls. */
+            if ((clen + plen) != 0) {
+                readbuflen = clen + plen;
+
+                rv = apr_socket_recv(conn->sock, readbuf, &readbuflen);
+                if (rv != APR_SUCCESS) {
+                    break;
+                }
+            }
+
+            switch (type) {
+            case FCGI_STDOUT:
+                if (clen != 0) {
+                    apr_bucket *b =
+                        apr_bucket_transient_create
+                            (readbuf, clen, r->connection->bucket_alloc);
+
+                    APR_BRIGADE_INSERT_TAIL(output_brigade, b);
+
+                    /* XXX Update conn->worker->s->read like AJP does */
+
+                    rv = ap_pass_brigade(r->output_filters, output_brigade);
+                    if (rv != APR_SUCCESS) {
+                        break;
+                    }
+
+                    apr_brigade_cleanup(output_brigade);
+                } else {
+                    apr_bucket *b =
+                        apr_bucket_eos_create(r->connection->bucket_alloc);
+
+                    APR_BRIGADE_INSERT_TAIL(output_brigade, b);
+
+                    rv = ap_pass_brigade(r->output_filters, output_brigade);
+                    if (rv != APR_SUCCESS) {
+                        break;
+                    }
+
+                    /* XXX Why don't we cleanup here?  (logic from AJP) */
+                }
+                break;
+
+            case FCGI_STDERR:
+                /* XXX TODO FCGI_STDERR gets written to the log file. */
+                break;
+
+            case FCGI_END_REQUEST:
+                done = 1;
+                break;
+
+            default:
+                ap_log_error(APLOG_MARK, APLOG_ERR, 0, r->server,
+                             "proxy: FCGI: Got bogus record %d", type);
+                break;
+            }
+        }
     }
 
     apr_brigade_destroy(input_brigade);
+    apr_brigade_destroy(output_brigade);
 
     return rv;
 }
@@ -421,8 +551,8 @@
         return HTTP_SERVICE_UNAVAILABLE;
     }
 
-    /* Step 3: Send Request Body via FCGI_STDIN */
-    rv = send_stdin(conn, r, request_id);
+    /* Step 3: Read records from the back end server and handle them. */
+    rv = dispatch(conn, r, request_id);
     if (rv != APR_SUCCESS) {
         ap_log_error(APLOG_MARK, APLOG_DEBUG, rv, r->server,
                      "proxy: FCGI: Failed writing STDIN to %s:",
@@ -430,11 +560,7 @@
         return HTTP_SERVICE_UNAVAILABLE;
     }
 
-    /* Step 4: Read for CGI_STDOUT|CGI_STDERR */
-    /* Step 5: Parse reply headers. */
-    /* Step 6: Stream reply body. */
-    /* Step 7: Read FCGI_END_REQUEST -> Done */
-    return HTTP_SERVICE_UNAVAILABLE;
+    return OK;
 }
 
 /*

Reply via email to