This patch makes the code for talking to the back end fastcgi process
use apr_poll, interleaving reads and writes as they become ready.
Note that it doesn't actually switch to nonblocking reads/writes, but
that can be done in another pass. This also switches to AP_IOBUFSIZE
for the read/write buffers as suggested by Justin, and more
importantly adds support for the FCGI_STDOUT so we can actually send
data from the back end fastcgi process to the front end client.
Log follows, patch attached. Comments, as always, are more than welcome.
-garrett
Add support for reading FCGI_STDOUT and FCGI_END_REQUEST records from the
back end fastcgi process. This includes switching to a poll based dispatch
loop that handles interleaved reads and writes.
* modules/proxy/mod_proxy_fcgi.c
(MAX_INPUT_BYTES): Removed, we now use AP_IOBUFSIZE.
(send_stdin): Removed, code incorporated into dispatch routine.
(dispatch): New, poll based dispatch loop that handles both reads and
writes.
(fcgi_do_request): Call new dispatch routine. Return OK if we get through
without errors.
Index: modules/proxy/mod_proxy_fcgi.c
===================================================================
--- modules/proxy/mod_proxy_fcgi.c (revision 359241)
+++ modules/proxy/mod_proxy_fcgi.c (working copy)
@@ -304,83 +304,213 @@
return apr_socket_sendv(conn->sock, vec, 1, &len);
}
-/*
- * An arbitrary buffer size for reading the stdin data from the client.
- *
- * Need to find a "better" value here, or at least justify the current
- * value somehow.
- */
-#define MAX_INPUT_BYTES 1024
-
-static apr_status_t send_stdin(proxy_conn_rec *conn, request_rec *r,
- int request_id)
+static apr_status_t dispatch(proxy_conn_rec *conn, request_rec *r,
+ int request_id)
{
- apr_bucket_brigade *input_brigade;
+ apr_bucket_brigade *input_brigade, *output_brigade;
apr_status_t rv = APR_SUCCESS;
+ char readbuf[AP_IOBUFSIZE];
+ apr_size_t readbuflen;
struct iovec vec[2];
fcgi_header header;
+ apr_pollfd_t pfd;
int done = 0;
fill_in_header(&header, FCGI_STDIN, request_id);
+ pfd.desc_type = APR_POLL_SOCKET;
+ pfd.desc.s = conn->sock;
+ pfd.p = r->pool;
+ pfd.reqevents = APR_POLLIN | APR_POLLOUT;
+
input_brigade = apr_brigade_create(r->pool, r->connection->bucket_alloc);
+ output_brigade = apr_brigade_create(r->pool, r->connection->bucket_alloc);
while (! done) {
- char buff[MAX_INPUT_BYTES];
- apr_size_t buflen, len;
+ apr_size_t len;
+ int n;
- rv = ap_get_brigade(r->input_filters, input_brigade,
- AP_MODE_READBYTES, APR_BLOCK_READ,
- MAX_INPUT_BYTES);
+ rv = apr_poll(&pfd, 1, &n, -1);
if (rv != APR_SUCCESS) {
break;
}
- if (APR_BUCKET_IS_EOS(APR_BRIGADE_LAST(input_brigade))) {
- done = 1;
- }
+ if (pfd.rtnevents & APR_POLLOUT) {
+ char writebuf[AP_IOBUFSIZE];
+ apr_size_t writebuflen;
+ int last_stdin = 0;
- buflen = sizeof(buff);
+ rv = ap_get_brigade(r->input_filters, input_brigade,
+ AP_MODE_READBYTES, APR_BLOCK_READ,
+ AP_IOBUFSIZE);
+ if (rv != APR_SUCCESS) {
+ break;
+ }
- rv = apr_brigade_flatten(input_brigade, buff, &buflen);
+ if (APR_BUCKET_IS_EOS(APR_BRIGADE_LAST(input_brigade))) {
+ last_stdin = 1;
+ }
- apr_brigade_cleanup(input_brigade);
+ writebuflen = sizeof(writebuf);
- if (rv != APR_SUCCESS) {
- break;
- }
+ rv = apr_brigade_flatten(input_brigade, writebuf, &writebuflen);
- header.contentLengthB1 = ((buflen >> 8) & 0xff);
- header.contentLengthB0 = ((buflen) & 0xff);
+ apr_brigade_cleanup(input_brigade);
- vec[0].iov_base = &header;
- vec[0].iov_len = sizeof(header);
- vec[1].iov_base = buff;
- vec[1].iov_len = buflen;
+ if (rv != APR_SUCCESS) {
+ break;
+ }
- rv = apr_socket_sendv(conn->sock, vec, 2, &len);
- if (rv != APR_SUCCESS) {
- break;
+ header.contentLengthB1 = ((writebuflen >> 8) & 0xff);
+ header.contentLengthB0 = ((writebuflen) & 0xff);
+
+ vec[0].iov_base = &header;
+ vec[0].iov_len = sizeof(header);
+ vec[1].iov_base = writebuf;
+ vec[1].iov_len = writebuflen;
+
+ /* XXX This should be nonblocking, and if we don't write all
+ * the data we need to keep track of that fact so we can
+ * get to it next time through. */
+ rv = apr_socket_sendv(conn->sock, vec, 2, &len);
+ if (rv != APR_SUCCESS) {
+ break;
+ }
+
+ /* XXX AJP updates conn->worker->s->transferred here, do we need
+ * to? */
+
+ if (last_stdin) {
+ pfd.reqevents = APR_POLLIN; /* Done with input data */
+
+ header.contentLengthB1 = 0;
+ header.contentLengthB0 = 0;
+
+ vec[0].iov_base = &header;
+ vec[0].iov_len = sizeof(header);
+
+ rv = apr_socket_sendv(conn->sock, vec, 1, &len);
+ }
}
- /* XXX AJP updates conn->worker->s->transferred here, do we need to? */
- }
+ if (pfd.rtnevents & APR_POLLIN) {
+ apr_size_t clen = 0;
+ int rid, type = 0;
+ char plen = 0;
- /* If we got here successfully it means we sent all the data, so we need
- * to send the final empty record to signify the end of the stream. */
- if (rv == APR_SUCCESS) {
- apr_size_t len;
+ /* First, we grab the header... */
+ readbuflen = 8;
- header.contentLengthB1 = 0;
- header.contentLengthB0 = 0;
+ rv = apr_socket_recv(conn->sock, readbuf, &readbuflen);
+ if (rv != APR_SUCCESS) {
+ break;
+ }
- vec[0].iov_base = &header;
- vec[0].iov_len = sizeof(header);
+ if (readbuflen != 8) {
+ ap_log_error(APLOG_MARK, APLOG_ERR, 0, r->server,
+ "proxy: FCGI: Failed to read entire header");
+ rv = APR_EINVAL;
+ break;
+ }
- rv = apr_socket_sendv(conn->sock, vec, 1, &len);
+ if (readbuf[0] != FCGI_VERSION) {
+ ap_log_error(APLOG_MARK, APLOG_ERR, 0, r->server,
+ "proxy: FCGI: Got bogus version %d",
+ (int) readbuf[0]);
+ rv = APR_EINVAL;
+ break;
+ }
+
+ type = readbuf[1];
+
+ rid |= readbuf[2] << 8;
+ rid |= readbuf[3] << 0;
+
+ if (rid != request_id) {
+ ap_log_error(APLOG_MARK, APLOG_ERR, 0, r->server,
+ "proxy: FCGI: Got bogus rid %d, expected %d",
+ rid, request_id);
+ rv = APR_EINVAL;
+ break;
+ }
+
+ clen |= readbuf[4] << 8;
+ clen |= readbuf[5] << 0;
+
+ plen = readbuf[6];
+
+ /* XXX We need support for content length > buffer size, but for
+ * now just punt. */
+ if ((clen + plen) > sizeof(readbuf)) {
+ ap_log_error(APLOG_MARK, APLOG_ERR, 0, r->server,
+ "proxy: FCGI: back end server send more data "
+ "than fits in buffer");
+ rv = APR_EINVAL;
+ break;
+ }
+
+ /* Now get the actual data. Yes it sucks to do this in a second
+ * recv call, this will eventually change when we move to real
+ * nonblocking recv calls. */
+ if ((clen + plen) != 0) {
+ readbuflen = clen + plen;
+
+ rv = apr_socket_recv(conn->sock, readbuf, &readbuflen);
+ if (rv != APR_SUCCESS) {
+ break;
+ }
+ }
+
+ switch (type) {
+ case FCGI_STDOUT:
+ if (clen != 0) {
+ apr_bucket *b =
+ apr_bucket_transient_create
+ (readbuf, clen, r->connection->bucket_alloc);
+
+ APR_BRIGADE_INSERT_TAIL(output_brigade, b);
+
+ /* XXX Update conn->worker->s->read like AJP does */
+
+ rv = ap_pass_brigade(r->output_filters, output_brigade);
+ if (rv != APR_SUCCESS) {
+ break;
+ }
+
+ apr_brigade_cleanup(output_brigade);
+ } else {
+ apr_bucket *b =
+ apr_bucket_eos_create(r->connection->bucket_alloc);
+
+ APR_BRIGADE_INSERT_TAIL(output_brigade, b);
+
+ rv = ap_pass_brigade(r->output_filters, output_brigade);
+ if (rv != APR_SUCCESS) {
+ break;
+ }
+
+ /* XXX Why don't we cleanup here? (logic from AJP) */
+ }
+ break;
+
+ case FCGI_STDERR:
+ /* XXX TODO FCGI_STDERR gets written to the log file. */
+ break;
+
+ case FCGI_END_REQUEST:
+ done = 1;
+ break;
+
+ default:
+ ap_log_error(APLOG_MARK, APLOG_ERR, 0, r->server,
+ "proxy: FCGI: Got bogus record %d", type);
+ break;
+ }
+ }
}
apr_brigade_destroy(input_brigade);
+ apr_brigade_destroy(output_brigade);
return rv;
}
@@ -421,8 +551,8 @@
return HTTP_SERVICE_UNAVAILABLE;
}
- /* Step 3: Send Request Body via FCGI_STDIN */
- rv = send_stdin(conn, r, request_id);
+ /* Step 3: Read records from the back end server and handle them. */
+ rv = dispatch(conn, r, request_id);
if (rv != APR_SUCCESS) {
ap_log_error(APLOG_MARK, APLOG_DEBUG, rv, r->server,
"proxy: FCGI: Failed writing STDIN to %s:",
@@ -430,11 +560,7 @@
return HTTP_SERVICE_UNAVAILABLE;
}
- /* Step 4: Read for CGI_STDOUT|CGI_STDERR */
- /* Step 5: Parse reply headers. */
- /* Step 6: Stream reply body. */
- /* Step 7: Read FCGI_END_REQUEST -> Done */
- return HTTP_SERVICE_UNAVAILABLE;
+ return OK;
}
/*