This patch makes the code for talking to the back end fastcgi process use apr_poll, interleaving reads and writes as they become ready. Note that it doesn't actually switch to nonblocking reads/writes, but that can be done in another pass. This also switches to AP_IOBUFSIZE for the read/write buffers as suggested by Justin, and more importantly adds support for the FCGI_STDOUT so we can actually send data from the back end fastcgi process to the front end client.
Log follows, patch attached. Comments, as always, are more than welcome. -garrett Add support for reading FCGI_STDOUT and FCGI_END_REQUEST records from the back end fastcgi process. This includes switching to a poll based dispatch loop that handles interleaved reads and writes. * modules/proxy/mod_proxy_fcgi.c (MAX_INPUT_BYTES): Removed, we now use AP_IOBUFSIZE. (send_stdin): Removed, code incorporated into dispatch routine. (dispatch): New, poll based dispatch loop that handles both reads and writes. (fcgi_do_request): Call new dispatch routine. Return OK if we get through without errors.
Index: modules/proxy/mod_proxy_fcgi.c =================================================================== --- modules/proxy/mod_proxy_fcgi.c (revision 359241) +++ modules/proxy/mod_proxy_fcgi.c (working copy) @@ -304,83 +304,213 @@ return apr_socket_sendv(conn->sock, vec, 1, &len); } -/* - * An arbitrary buffer size for reading the stdin data from the client. - * - * Need to find a "better" value here, or at least justify the current - * value somehow. - */ -#define MAX_INPUT_BYTES 1024 - -static apr_status_t send_stdin(proxy_conn_rec *conn, request_rec *r, - int request_id) +static apr_status_t dispatch(proxy_conn_rec *conn, request_rec *r, + int request_id) { - apr_bucket_brigade *input_brigade; + apr_bucket_brigade *input_brigade, *output_brigade; apr_status_t rv = APR_SUCCESS; + char readbuf[AP_IOBUFSIZE]; + apr_size_t readbuflen; struct iovec vec[2]; fcgi_header header; + apr_pollfd_t pfd; int done = 0; fill_in_header(&header, FCGI_STDIN, request_id); + pfd.desc_type = APR_POLL_SOCKET; + pfd.desc.s = conn->sock; + pfd.p = r->pool; + pfd.reqevents = APR_POLLIN | APR_POLLOUT; + input_brigade = apr_brigade_create(r->pool, r->connection->bucket_alloc); + output_brigade = apr_brigade_create(r->pool, r->connection->bucket_alloc); while (! done) { - char buff[MAX_INPUT_BYTES]; - apr_size_t buflen, len; + apr_size_t len; + int n; - rv = ap_get_brigade(r->input_filters, input_brigade, - AP_MODE_READBYTES, APR_BLOCK_READ, - MAX_INPUT_BYTES); + rv = apr_poll(&pfd, 1, &n, -1); if (rv != APR_SUCCESS) { break; } - if (APR_BUCKET_IS_EOS(APR_BRIGADE_LAST(input_brigade))) { - done = 1; - } + if (pfd.rtnevents & APR_POLLOUT) { + char writebuf[AP_IOBUFSIZE]; + apr_size_t writebuflen; + int last_stdin = 0; - buflen = sizeof(buff); + rv = ap_get_brigade(r->input_filters, input_brigade, + AP_MODE_READBYTES, APR_BLOCK_READ, + AP_IOBUFSIZE); + if (rv != APR_SUCCESS) { + break; + } - rv = apr_brigade_flatten(input_brigade, buff, &buflen); + if (APR_BUCKET_IS_EOS(APR_BRIGADE_LAST(input_brigade))) { + last_stdin = 1; + } - apr_brigade_cleanup(input_brigade); + writebuflen = sizeof(writebuf); - if (rv != APR_SUCCESS) { - break; - } + rv = apr_brigade_flatten(input_brigade, writebuf, &writebuflen); - header.contentLengthB1 = ((buflen >> 8) & 0xff); - header.contentLengthB0 = ((buflen) & 0xff); + apr_brigade_cleanup(input_brigade); - vec[0].iov_base = &header; - vec[0].iov_len = sizeof(header); - vec[1].iov_base = buff; - vec[1].iov_len = buflen; + if (rv != APR_SUCCESS) { + break; + } - rv = apr_socket_sendv(conn->sock, vec, 2, &len); - if (rv != APR_SUCCESS) { - break; + header.contentLengthB1 = ((writebuflen >> 8) & 0xff); + header.contentLengthB0 = ((writebuflen) & 0xff); + + vec[0].iov_base = &header; + vec[0].iov_len = sizeof(header); + vec[1].iov_base = writebuf; + vec[1].iov_len = writebuflen; + + /* XXX This should be nonblocking, and if we don't write all + * the data we need to keep track of that fact so we can + * get to it next time through. */ + rv = apr_socket_sendv(conn->sock, vec, 2, &len); + if (rv != APR_SUCCESS) { + break; + } + + /* XXX AJP updates conn->worker->s->transferred here, do we need + * to? */ + + if (last_stdin) { + pfd.reqevents = APR_POLLIN; /* Done with input data */ + + header.contentLengthB1 = 0; + header.contentLengthB0 = 0; + + vec[0].iov_base = &header; + vec[0].iov_len = sizeof(header); + + rv = apr_socket_sendv(conn->sock, vec, 1, &len); + } } - /* XXX AJP updates conn->worker->s->transferred here, do we need to? */ - } + if (pfd.rtnevents & APR_POLLIN) { + apr_size_t clen = 0; + int rid, type = 0; + char plen = 0; - /* If we got here successfully it means we sent all the data, so we need - * to send the final empty record to signify the end of the stream. */ - if (rv == APR_SUCCESS) { - apr_size_t len; + /* First, we grab the header... */ + readbuflen = 8; - header.contentLengthB1 = 0; - header.contentLengthB0 = 0; + rv = apr_socket_recv(conn->sock, readbuf, &readbuflen); + if (rv != APR_SUCCESS) { + break; + } - vec[0].iov_base = &header; - vec[0].iov_len = sizeof(header); + if (readbuflen != 8) { + ap_log_error(APLOG_MARK, APLOG_ERR, 0, r->server, + "proxy: FCGI: Failed to read entire header"); + rv = APR_EINVAL; + break; + } - rv = apr_socket_sendv(conn->sock, vec, 1, &len); + if (readbuf[0] != FCGI_VERSION) { + ap_log_error(APLOG_MARK, APLOG_ERR, 0, r->server, + "proxy: FCGI: Got bogus version %d", + (int) readbuf[0]); + rv = APR_EINVAL; + break; + } + + type = readbuf[1]; + + rid |= readbuf[2] << 8; + rid |= readbuf[3] << 0; + + if (rid != request_id) { + ap_log_error(APLOG_MARK, APLOG_ERR, 0, r->server, + "proxy: FCGI: Got bogus rid %d, expected %d", + rid, request_id); + rv = APR_EINVAL; + break; + } + + clen |= readbuf[4] << 8; + clen |= readbuf[5] << 0; + + plen = readbuf[6]; + + /* XXX We need support for content length > buffer size, but for + * now just punt. */ + if ((clen + plen) > sizeof(readbuf)) { + ap_log_error(APLOG_MARK, APLOG_ERR, 0, r->server, + "proxy: FCGI: back end server send more data " + "than fits in buffer"); + rv = APR_EINVAL; + break; + } + + /* Now get the actual data. Yes it sucks to do this in a second + * recv call, this will eventually change when we move to real + * nonblocking recv calls. */ + if ((clen + plen) != 0) { + readbuflen = clen + plen; + + rv = apr_socket_recv(conn->sock, readbuf, &readbuflen); + if (rv != APR_SUCCESS) { + break; + } + } + + switch (type) { + case FCGI_STDOUT: + if (clen != 0) { + apr_bucket *b = + apr_bucket_transient_create + (readbuf, clen, r->connection->bucket_alloc); + + APR_BRIGADE_INSERT_TAIL(output_brigade, b); + + /* XXX Update conn->worker->s->read like AJP does */ + + rv = ap_pass_brigade(r->output_filters, output_brigade); + if (rv != APR_SUCCESS) { + break; + } + + apr_brigade_cleanup(output_brigade); + } else { + apr_bucket *b = + apr_bucket_eos_create(r->connection->bucket_alloc); + + APR_BRIGADE_INSERT_TAIL(output_brigade, b); + + rv = ap_pass_brigade(r->output_filters, output_brigade); + if (rv != APR_SUCCESS) { + break; + } + + /* XXX Why don't we cleanup here? (logic from AJP) */ + } + break; + + case FCGI_STDERR: + /* XXX TODO FCGI_STDERR gets written to the log file. */ + break; + + case FCGI_END_REQUEST: + done = 1; + break; + + default: + ap_log_error(APLOG_MARK, APLOG_ERR, 0, r->server, + "proxy: FCGI: Got bogus record %d", type); + break; + } + } } apr_brigade_destroy(input_brigade); + apr_brigade_destroy(output_brigade); return rv; } @@ -421,8 +551,8 @@ return HTTP_SERVICE_UNAVAILABLE; } - /* Step 3: Send Request Body via FCGI_STDIN */ - rv = send_stdin(conn, r, request_id); + /* Step 3: Read records from the back end server and handle them. */ + rv = dispatch(conn, r, request_id); if (rv != APR_SUCCESS) { ap_log_error(APLOG_MARK, APLOG_DEBUG, rv, r->server, "proxy: FCGI: Failed writing STDIN to %s:", @@ -430,11 +560,7 @@ return HTTP_SERVICE_UNAVAILABLE; } - /* Step 4: Read for CGI_STDOUT|CGI_STDERR */ - /* Step 5: Parse reply headers. */ - /* Step 6: Stream reply body. */ - /* Step 7: Read FCGI_END_REQUEST -> Done */ - return HTTP_SERVICE_UNAVAILABLE; + return OK; } /*