Hi, few free time lately, yet I tried to implement this and it seems to work pretty well.
Details below (and 2.4.x patches attached)... On Thu, Feb 23, 2017 at 4:38 PM, Yann Ylavic <ylavic....@gmail.com> wrote: > > So I'm thinking of another way to achieve the same with the current > APR_BUCKET_BUFF_SIZE (2 pages) per alloc. > > The idea is to have a new apr_allocator_allocv() function which would > fill an iovec with what's available in the allocator's freelist (i.e. > spare apr_memnodes) of at least the given min_size bytes (possibly a > max too but I don't see the need for now) and up to the size of the > given iovec. > > This function could be the base of a new apr_bucket_allocv() (and > possibly apr_p[c]allocv(), though out of scope here) which in turn > could be used by file_bucket_read() to get an iovec of available > buffers. > This iovec could then be passed to (new still) apr_file_readv() based > on the readv() syscall, which would allow to read much more data in > one go. > > With this the scheme we'd have iovec from end to end, well, sort of > since mod_ssl would be break the chain but still produce transient > buckets on output which anyway will end up in the core_output_filter's > brigade of aside heap buckets, for apr_socket_sendv() to finally > writev() them. > > We'd also have more recycled heap buckets (hence memnodes in the > allocator) as the core_output_filter retains buckets, all with > APR_BUCKET_BUFF_SIZE, up to THRESHOLD_MAX_BUFFER which, if > configurable and along with MaxMemFree, would be the real limiter of > recycling. So here is how I finally got this, the principles remain. Changes are (obviously) on both httpd and (mainly) APR sides (didn't propose to APR team yet, wanted some feedbacks here first ;) On the httpd side, it's mainly the parametrization of some hardcoded values in the core output filter, namely: +AP_INIT_TAKE1("OutputFlushThreshold", set_output_flush_threshold, NULL, RSRC_CONF, + "Size (in bytes) above which pending data are flushed to the network"), +AP_INIT_TAKE1("OutputFlushMemThreshold", set_output_flush_mem_threshold, NULL, RSRC_CONF, + "Size (in bytes) above which pending memory data are flushed to the network"), +AP_INIT_TAKE1("OutputFlushMaxPipelined", set_output_flush_max_pipelined, NULL, RSRC_CONF, + "Number of pipelined/pending responses above which they are flushed to the network"), These were respectively THRESHOLD_MIN_WRITE, THRESHOLD_MAX_BUFFER and MAX_REQUESTS_IN_PIPELINE. The iovec used previously (on stack) is removed, in favor of a per-connection (ctx) dynamic/growable one which allows to get rid of an artificial/harcoded flush limit (MAX_IOVEC_TO_WRITE). Hence the flush is now solely governed by either the overall pending bytes (OutputFlushThreshold), the in-memory pending bytes (OutputFlushMemThreshold, which catches the MAX_IOVEC_TO_WRITE case), or the number of pipelined requests (OutputFlushMaxPipelined). Also, to reach these thresholds, there is a way to enable scattered reads (i.e. readv) on files (à la EnableMMAP/EnableSendfile), with a tunable buffer size: +AP_INIT_TAKE1("EnableReadfileScatter", set_enable_readfile_scatter, NULL, OR_FILEINFO, + "Controls whether buffer scattering may be used to read files ('off', 'on', 'fixed')"), +AP_INIT_TAKE1("ReadfileBufferSize", set_readfile_buf_size, NULL, OR_FILEINFO, + "Size (in bytes) of the memory buffers used to read files"), These are set in the core_handler only (for now), and it's where APR comes into play :) Currently I have good results (with gdb/LOG_TRACE, no stress test yet ;) For "http:" (main server) with: EnableMMAP off EnableSendfile off EnableScatterReadfile on #FileReadBufferSize 8192 <= default FlushThreshold 1048576 FlushMemThreshold 65536 FlushMaxPipelined 5 And for "https:" (vhost) with overridden: EnableScatterReadfile fixed FileReadBufferSize 16384 Not heavily tested though, but the traces look good :p So on the APR side, apr_file_readv() obviously (implemented only with readv() for now, i.e. APR_ENOTIMPL on Windows and OS/2 which seem to be the ones that don't share the unix fileio code), is used by apr_bucket_read()::file_bucket_read() to fill in its buffers and morph/chain heap buckets. The buffers come from an apr_bucket_alloc_t, thus the new apr_bucket_bulk_alloc(), apr_bucket_bulk_free() and apr_allocator_bulk_alloc() functions where finally all the "automagic" is. With apr_allocator_bulk_alloc(), one can request several apr_memnode_t of a fixed (optional) or minimal given size, and in the worst case get a single one (allocaœted), or in the best case as much free ones as available (within a maximal size, also given). Since this is old and sensible code, I tried to be the least invasive possible, though efficient too :) Regarding apr_bucket_bulk_alloc() (and its apr_bucket_bulk_free() counterpart), it's mainly wrappers around apr_allocator_bulk_alloc() to produce an iovec of memory blocks from a list of apr_memnode_t (resp. free them in a one go). That is (sorry if it was too long or too short), some comments in the code, and comments / questions / tests / bug{reports,fixes} welcome ;) Regards, Yann.
Index: include/http_core.h =================================================================== --- include/http_core.h (revision 1783852) +++ include/http_core.h (working copy) @@ -616,6 +616,13 @@ typedef struct { #define ENABLE_SENDFILE_UNSET (2) unsigned int enable_sendfile : 2; /* files in this dir can be sendfile'ed */ +#define ENABLE_READFILE_SCATTER_OFF (0) +#define ENABLE_READFILE_SCATTER_ON (1) +#define ENABLE_READFILE_SCATTER_FIXED (2) +#define ENABLE_READFILE_SCATTER_UNSET (3) + unsigned int enable_readfile_scatter : 2; /* whether files in this dir can + be scattered on read (readv) */ + #define USE_CANONICAL_PHYS_PORT_OFF (0) #define USE_CANONICAL_PHYS_PORT_ON (1) #define USE_CANONICAL_PHYS_PORT_UNSET (2) @@ -672,6 +679,8 @@ typedef struct { /** Table of rules for building CGI variables, NULL if none configured */ apr_hash_t *cgi_var_rules; + + apr_size_t readfile_buf_size; } core_dir_config; /* macro to implement off by default behaviour */ @@ -741,6 +750,9 @@ typedef struct { #define AP_HTTP_METHODS_REGISTERED 2 char http_methods; + apr_size_t output_flush_threshold; + apr_size_t output_flush_mem_threshold; + apr_uint32_t output_flush_max_pipelined; } core_server_config; /* for AddOutputFiltersByType in core.c */ Index: server/core.c =================================================================== --- server/core.c (revision 1783852) +++ server/core.c (working copy) @@ -80,6 +80,10 @@ #define AP_CONTENT_MD5_ON 1 #define AP_CONTENT_MD5_UNSET 2 +#define AP_FLUSH_THRESHOLD 4096 +#define AP_FLUSH_MEM_THRESHOLD 65536 +#define AP_FLUSH_MAX_PIPELINED 5 + APR_HOOK_STRUCT( APR_HOOK_LINK(get_mgmt_items) APR_HOOK_LINK(insert_network_bucket) @@ -183,6 +187,7 @@ static void *create_core_dir_config(apr_pool_t *a, conf->enable_mmap = ENABLE_MMAP_UNSET; conf->enable_sendfile = ENABLE_SENDFILE_UNSET; + conf->enable_readfile_scatter = ENABLE_READFILE_SCATTER_UNSET; conf->allow_encoded_slashes = 0; conf->decode_encoded_slashes = 0; @@ -390,6 +395,20 @@ static void *merge_core_dir_configs(apr_pool_t *a, conf->enable_sendfile = new->enable_sendfile; } + if (new->enable_readfile_scatter != ENABLE_READFILE_SCATTER_UNSET) { + conf->enable_readfile_scatter = new->enable_readfile_scatter; + } + else { + conf->enable_readfile_scatter = base->enable_readfile_scatter; + } + + if (new->readfile_buf_size) { + conf->readfile_buf_size = new->readfile_buf_size; + } + else { + conf->readfile_buf_size = base->readfile_buf_size; + } + conf->allow_encoded_slashes = new->allow_encoded_slashes; conf->decode_encoded_slashes = new->decode_encoded_slashes; @@ -461,6 +480,10 @@ static void *create_core_server_config(apr_pool_t apr_table_setn(conf->accf_map, "http", "data"); apr_table_setn(conf->accf_map, "https", "data"); #endif + + conf->output_flush_threshold = AP_FLUSH_THRESHOLD; + conf->output_flush_mem_threshold = AP_FLUSH_MEM_THRESHOLD; + conf->output_flush_max_pipelined = AP_FLUSH_MAX_PIPELINED; } /* pcalloc'ed - we have NULL's/0's else ** is_virtual ** { @@ -554,6 +577,13 @@ static void *merge_core_server_configs(apr_pool_t conf->protocols_honor_order = ((virt->protocols_honor_order < 0)? base->protocols_honor_order : virt->protocols_honor_order); + + if (virt->output_flush_threshold) + conf->output_flush_threshold = virt->output_flush_threshold; + if (virt->output_flush_mem_threshold) + conf->output_flush_mem_threshold = virt->output_flush_mem_threshold; + if (virt->output_flush_max_pipelined) + conf->output_flush_max_pipelined = virt->output_flush_max_pipelined; return conf; } @@ -2208,7 +2238,107 @@ static const char *set_enable_sendfile(cmd_parms * return NULL; } +static const char *set_enable_readfile_scatter(cmd_parms *cmd, void *d_, + const char *arg) +{ + core_dir_config *d = d_; + if (strcasecmp(arg, "on") == 0) { + d->enable_readfile_scatter = ENABLE_READFILE_SCATTER_ON; + } + else if (strcasecmp(arg, "fixed") == 0) { + d->enable_readfile_scatter = ENABLE_READFILE_SCATTER_FIXED; + } + else if (strcasecmp(arg, "off") == 0) { + d->enable_readfile_scatter = ENABLE_READFILE_SCATTER_OFF; + } + else { + return "parameter must be 'on' or 'off'"; + } + + return NULL; +} + +static const char *set_readfile_buf_size(cmd_parms *cmd, void *d_, + const char *arg) +{ + core_dir_config *d = d_; + apr_off_t size; + char *end; + + if (apr_strtoff(&size, arg, &end, 10) + || size < 0 || size > APR_SIZE_MAX || *end) + return apr_pstrcat(cmd->pool, + "parameter must be a number between 0 and " + APR_STRINGIFY(APR_SIZE_MAX) "): ", + arg, NULL); + + d->readfile_buf_size = (apr_size_t)size; + + return NULL; +} + +static const char *set_output_flush_threshold(cmd_parms *cmd, void *d_, + const char *arg) +{ + core_server_config *conf = + ap_get_core_module_config(cmd->server->module_config); + apr_off_t size; + char *end; + + if (apr_strtoff(&size, arg, &end, 10) + || size <= 0 || size > APR_SIZE_MAX || *end) + return apr_pstrcat(cmd->pool, + "parameter must be a number between 1 and " + APR_STRINGIFY(APR_SIZE_MAX) "): ", + arg, NULL); + + conf->output_flush_threshold = (apr_size_t)size; + + return NULL; +} + +static const char *set_output_flush_mem_threshold(cmd_parms *cmd, void *d_, + const char *arg) +{ + core_server_config *conf = + ap_get_core_module_config(cmd->server->module_config); + apr_off_t size; + char *end; + + if (apr_strtoff(&size, arg, &end, 10) + || size <= 0 || size > APR_SIZE_MAX || *end) + return apr_pstrcat(cmd->pool, + "parameter must be a number between 1 and " + APR_STRINGIFY(APR_SIZE_MAX) "): ", + arg, NULL); + + conf->output_flush_mem_threshold = (apr_size_t)size; + + return NULL; +} + +static const char *set_output_flush_max_pipelined(cmd_parms *cmd, void *d_, + const char *arg) +{ + core_server_config *conf = + ap_get_core_module_config(cmd->server->module_config); + apr_off_t num; + char *end; + + if (apr_strtoff(&num, arg, &end, 10) + || num < 0 || num > APR_UINT32_MAX || *end) + return apr_pstrcat(cmd->pool, + "parameter must be a number between 0 and " + APR_STRINGIFY(APR_UINT32_MAX) ": ", + arg, NULL); + + conf->output_flush_max_pipelined = (apr_uint32_t)num; + + return NULL; +} + + /* * Report a missing-'>' syntax error. */ @@ -4300,6 +4430,16 @@ AP_INIT_TAKE1("EnableMMAP", set_enable_mmap, NULL, "Controls whether memory-mapping may be used to read files"), AP_INIT_TAKE1("EnableSendfile", set_enable_sendfile, NULL, OR_FILEINFO, "Controls whether sendfile may be used to transmit files"), +AP_INIT_TAKE1("EnableReadfileScatter", set_enable_readfile_scatter, NULL, OR_FILEINFO, + "Controls whether buffer scattering may be used to read files ('off', 'on', 'fixed')"), +AP_INIT_TAKE1("ReadfileBufferSize", set_readfile_buf_size, NULL, OR_FILEINFO, + "Size (in bytes) of the memory buffers used to read files"), +AP_INIT_TAKE1("OutputFlushThreshold", set_output_flush_threshold, NULL, RSRC_CONF, + "Size (in bytes) above which pending data are flushed to the network"), +AP_INIT_TAKE1("OutputFlushMemThreshold", set_output_flush_mem_threshold, NULL, RSRC_CONF, + "Size (in bytes) above which pending memory data are flushed to the network"), +AP_INIT_TAKE1("OutputFlushMaxPipelined", set_output_flush_max_pipelined, NULL, RSRC_CONF, + "Number of pipelined/pending responses above which they are flushed to the network"), /* Old server config file commands */ @@ -4738,6 +4878,15 @@ static int default_handler(request_rec *r) (void)apr_bucket_file_enable_mmap(e, 0); } #endif + if (d->enable_readfile_scatter == ENABLE_READFILE_SCATTER_ON) { + apr_bucket_file_enable_scatter(e, APR_BUCKET_FILE_SCATTER_ON); + } + else if (d->enable_readfile_scatter == ENABLE_READFILE_SCATTER_FIXED) { + apr_bucket_file_enable_scatter(e, APR_BUCKET_FILE_SCATTER_FIXED); + } + if (d->readfile_buf_size != 0) { + apr_bucket_file_read_size_set(e, d->readfile_buf_size); + } } e = apr_bucket_eos_create(c->bucket_alloc); Index: server/core_filters.c =================================================================== --- server/core_filters.c (revision 1783852) +++ server/core_filters.c (working copy) @@ -78,10 +78,13 @@ do { \ #define APLOG_MODULE_INDEX AP_CORE_MODULE_INDEX struct core_output_filter_ctx { + core_server_config *conf; apr_bucket_brigade *buffered_bb; apr_bucket_brigade *tmp_flush_bb; apr_pool_t *deferred_write_pool; apr_size_t bytes_written; + struct iovec *vec; + apr_size_t nvec; }; struct core_filter_ctx { @@ -335,7 +338,7 @@ static void setaside_remaining_output(ap_filter_t static apr_status_t send_brigade_nonblocking(apr_socket_t *s, apr_bucket_brigade *bb, - apr_size_t *bytes_written, + core_output_filter_ctx_t *ctx, conn_rec *c); static void remove_empty_buckets(apr_bucket_brigade *bb); @@ -342,27 +345,22 @@ static void remove_empty_buckets(apr_bucket_brigad static apr_status_t send_brigade_blocking(apr_socket_t *s, apr_bucket_brigade *bb, - apr_size_t *bytes_written, + core_output_filter_ctx_t *ctx, conn_rec *c); static apr_status_t writev_nonblocking(apr_socket_t *s, - struct iovec *vec, apr_size_t nvec, apr_bucket_brigade *bb, - apr_size_t *cumulative_bytes_written, + core_output_filter_ctx_t *ctx, + apr_size_t nvec, conn_rec *c); #if APR_HAS_SENDFILE static apr_status_t sendfile_nonblocking(apr_socket_t *s, apr_bucket *bucket, - apr_size_t *cumulative_bytes_written, + core_output_filter_ctx_t *ctx, conn_rec *c); #endif -/* XXX: Should these be configurable parameters? */ -#define THRESHOLD_MIN_WRITE 4096 -#define THRESHOLD_MAX_BUFFER 65536 -#define MAX_REQUESTS_IN_PIPELINE 5 - /* Optional function coming from mod_logio, used for logging of output * traffic */ @@ -373,10 +371,13 @@ apr_status_t ap_core_output_filter(ap_filter_t *f, conn_rec *c = f->c; core_net_rec *net = f->ctx; core_output_filter_ctx_t *ctx = net->out_ctx; + core_server_config *conf = + ap_get_core_module_config(c->base_server->module_config); apr_bucket_brigade *bb = NULL; apr_bucket *bucket, *next, *flush_upto = NULL; apr_size_t bytes_in_brigade, non_file_bytes_in_brigade; - int eor_buckets_in_brigade, morphing_bucket_in_brigade; + apr_uint32_t eor_buckets_in_brigade; + int morphing_bucket_in_brigade; apr_status_t rv; /* Fail quickly if the connection has already been aborted. */ @@ -390,6 +391,7 @@ apr_status_t ap_core_output_filter(ap_filter_t *f, if (ctx == NULL) { ctx = apr_pcalloc(c->pool, sizeof(*ctx)); net->out_ctx = (core_output_filter_ctx_t *)ctx; + ctx->conf = conf; /* * Need to create tmp brigade with correct lifetime. Passing * NULL to apr_brigade_split_ex would result in a brigade @@ -434,16 +436,16 @@ apr_status_t ap_core_output_filter(ap_filter_t *f, * of everything up that point. * * b) The request is in CONN_STATE_HANDLER state, and the brigade - * contains at least THRESHOLD_MAX_BUFFER bytes in non-file + * contains at least output_flush_mem_threshold bytes in non-file * buckets: Do blocking writes until the amount of data in the - * buffer is less than THRESHOLD_MAX_BUFFER. (The point of this + * buffer is less than output_flush_mem_threshold. (The point of this * rule is to provide flow control, in case a handler is * streaming out lots of data faster than the data can be * sent to the client.) * * c) The request is in CONN_STATE_HANDLER state, and the brigade - * contains at least MAX_REQUESTS_IN_PIPELINE EOR buckets: - * Do blocking writes until less than MAX_REQUESTS_IN_PIPELINE EOR + * contains at least output_flush_max_pipelined EOR buckets: + * Do blocking writes until less than output_flush_max_pipelined EOR * buckets are left. (The point of this rule is to prevent too many * FDs being kept open by pipelined requests, possibly allowing a * DoS). @@ -450,7 +452,7 @@ apr_status_t ap_core_output_filter(ap_filter_t *f, * * d) The brigade contains a morphing bucket: If there was no other * reason to do a blocking write yet, try reading the bucket. If its - * contents fit into memory before THRESHOLD_MAX_BUFFER is reached, + * contents fit into memory before output_flush_mem_threshold is reached, * everything is fine. Otherwise we need to do a blocking write the * up to and including the morphing bucket, because ap_save_brigade() * would read the whole bucket into memory later on. @@ -459,14 +461,13 @@ apr_status_t ap_core_output_filter(ap_filter_t *f, * by rules 2a-d. The point of doing only one flush is to make as * few calls to writev() as possible. * - * 4) If the brigade contains at least THRESHOLD_MIN_WRITE + * 4) If the brigade contains at least output_flush_threshold * bytes: Do a nonblocking write of as much data as possible, * then save the rest in ctx->buffered_bb. */ if (new_bb == NULL) { - rv = send_brigade_nonblocking(net->client_socket, bb, - &(ctx->bytes_written), c); + rv = send_brigade_nonblocking(net->client_socket, bb, ctx, c); if (rv != APR_SUCCESS && !APR_STATUS_IS_EAGAIN(rv)) { /* The client has aborted the connection */ ap_log_cerror(APLOG_MARK, APLOG_TRACE1, rv, c, @@ -481,8 +482,8 @@ apr_status_t ap_core_output_filter(ap_filter_t *f, bytes_in_brigade = 0; non_file_bytes_in_brigade = 0; + morphing_bucket_in_brigade = 0; eor_buckets_in_brigade = 0; - morphing_bucket_in_brigade = 0; for (bucket = APR_BRIGADE_FIRST(bb); bucket != APR_BRIGADE_SENTINEL(bb); bucket = next) { @@ -508,18 +509,20 @@ apr_status_t ap_core_output_filter(ap_filter_t *f, } if (APR_BUCKET_IS_FLUSH(bucket) - || non_file_bytes_in_brigade >= THRESHOLD_MAX_BUFFER || morphing_bucket_in_brigade - || eor_buckets_in_brigade > MAX_REQUESTS_IN_PIPELINE) { + || non_file_bytes_in_brigade >= conf->output_flush_mem_threshold + || eor_buckets_in_brigade > conf->output_flush_max_pipelined) { /* this segment of the brigade MUST be sent before returning. */ if (APLOGctrace6(c)) { char *reason = APR_BUCKET_IS_FLUSH(bucket) ? "FLUSH bucket" : - (non_file_bytes_in_brigade >= THRESHOLD_MAX_BUFFER) ? - "THRESHOLD_MAX_BUFFER" : - morphing_bucket_in_brigade ? "morphing bucket" : - "MAX_REQUESTS_IN_PIPELINE"; + morphing_bucket_in_brigade ? + "morphing bucket" : + (non_file_bytes_in_brigade >= + conf->output_flush_mem_threshold) ? + "mem threshold" : + "max pipelined"; ap_log_cerror(APLOG_MARK, APLOG_TRACE6, 0, c, "core_output_filter: flushing because of %s", reason); @@ -539,8 +542,7 @@ apr_status_t ap_core_output_filter(ap_filter_t *f, if (flush_upto != NULL) { ctx->tmp_flush_bb = apr_brigade_split_ex(bb, flush_upto, ctx->tmp_flush_bb); - rv = send_brigade_blocking(net->client_socket, bb, - &(ctx->bytes_written), c); + rv = send_brigade_blocking(net->client_socket, bb, ctx, c); if (rv != APR_SUCCESS) { /* The client has aborted the connection */ ap_log_cerror(APLOG_MARK, APLOG_TRACE1, rv, c, @@ -552,9 +554,8 @@ apr_status_t ap_core_output_filter(ap_filter_t *f, APR_BRIGADE_CONCAT(bb, ctx->tmp_flush_bb); } - if (bytes_in_brigade >= THRESHOLD_MIN_WRITE) { - rv = send_brigade_nonblocking(net->client_socket, bb, - &(ctx->bytes_written), c); + if (bytes_in_brigade >= conf->output_flush_threshold) { + rv = send_brigade_nonblocking(net->client_socket, bb, ctx, c); if ((rv != APR_SUCCESS) && (!APR_STATUS_IS_EAGAIN(rv))) { /* The client has aborted the connection */ ap_log_cerror(APLOG_MARK, APLOG_TRACE1, rv, c, @@ -603,24 +604,26 @@ static void setaside_remaining_output(ap_filter_t } #ifndef APR_MAX_IOVEC_SIZE -#define MAX_IOVEC_TO_WRITE 16 +#define MIN_IOVEC_SIZE 16 +#define MAX_IOVEC_SIZE MIN_IOVEC_SIZE #else #if APR_MAX_IOVEC_SIZE > 16 -#define MAX_IOVEC_TO_WRITE 16 +#define MIN_IOVEC_SIZE 16 #else -#define MAX_IOVEC_TO_WRITE APR_MAX_IOVEC_SIZE +#define MIN_IOVEC_SIZE APR_MAX_IOVEC_SIZE #endif +#define MAX_IOVEC_SIZE APR_MAX_IOVEC_SIZE #endif static apr_status_t send_brigade_nonblocking(apr_socket_t *s, apr_bucket_brigade *bb, - apr_size_t *bytes_written, + core_output_filter_ctx_t *ctx, conn_rec *c) { apr_bucket *bucket, *next; apr_status_t rv; - struct iovec vec[MAX_IOVEC_TO_WRITE]; apr_size_t nvec = 0; + apr_size_t bytes_in_flight = 0; remove_empty_buckets(bb); @@ -642,13 +645,13 @@ static apr_status_t send_brigade_nonblocking(apr_s (bucket->length >= AP_MIN_SENDFILE_BYTES)) { if (nvec > 0) { (void)apr_socket_opt_set(s, APR_TCP_NOPUSH, 1); - rv = writev_nonblocking(s, vec, nvec, bb, bytes_written, c); + rv = writev_nonblocking(s, bb, ctx, nvec, c); if (rv != APR_SUCCESS) { (void)apr_socket_opt_set(s, APR_TCP_NOPUSH, 0); return rv; } } - rv = sendfile_nonblocking(s, bucket, bytes_written, c); + rv = sendfile_nonblocking(s, bucket, ctx, c); if (nvec > 0) { (void)apr_socket_opt_set(s, APR_TCP_NOPUSH, 0); nvec = 0; @@ -671,8 +674,8 @@ static apr_status_t send_brigade_nonblocking(apr_s if (APR_STATUS_IS_EAGAIN(rv)) { /* Read would block; flush any pending data and retry. */ if (nvec) { - rv = writev_nonblocking(s, vec, nvec, bb, bytes_written, c); - if (rv) { + rv = writev_nonblocking(s, bb, ctx, nvec, c); + if (rv != APR_SUCCESS) { return rv; } nvec = 0; @@ -686,22 +689,50 @@ static apr_status_t send_brigade_nonblocking(apr_s /* reading may have split the bucket, so recompute next: */ next = APR_BUCKET_NEXT(bucket); - vec[nvec].iov_base = (char *)data; - vec[nvec].iov_len = length; + + if (nvec >= ctx->nvec) { + if (ctx->nvec < MAX_IOVEC_SIZE) { + struct iovec *newv; + apr_size_t newn = ctx->nvec * 2; + if (newn < MIN_IOVEC_SIZE) { + newn = MIN_IOVEC_SIZE; + } + else if (newn > MAX_IOVEC_SIZE) { + newn = MAX_IOVEC_SIZE; + } + newv = apr_palloc(c->pool, newn * sizeof(*newv)); + if (ctx->nvec) { + memcpy(newv, ctx->vec, ctx->nvec * sizeof(*newv)); + } + ctx->nvec = newn; + ctx->vec = newv; + } + else { + rv = writev_nonblocking(s, bb, ctx, nvec, c); + if (rv != APR_SUCCESS) { + return rv; + } + nvec = 0; + } + } + ctx->vec[nvec].iov_base = (char *)data; + ctx->vec[nvec].iov_len = length; nvec++; - if (nvec == MAX_IOVEC_TO_WRITE) { - rv = writev_nonblocking(s, vec, nvec, bb, bytes_written, c); - nvec = 0; + + bytes_in_flight += length; + if (bytes_in_flight >= ctx->conf->output_flush_mem_threshold) { + rv = writev_nonblocking(s, bb, ctx, nvec, c); if (rv != APR_SUCCESS) { return rv; } - break; + bytes_in_flight = 0; + nvec = 0; } } } if (nvec > 0) { - rv = writev_nonblocking(s, vec, nvec, bb, bytes_written, c); + rv = writev_nonblocking(s, bb, ctx, nvec, c); if (rv != APR_SUCCESS) { return rv; } @@ -723,7 +754,7 @@ static void remove_empty_buckets(apr_bucket_brigad static apr_status_t send_brigade_blocking(apr_socket_t *s, apr_bucket_brigade *bb, - apr_size_t *bytes_written, + core_output_filter_ctx_t *ctx, conn_rec *c) { apr_status_t rv; @@ -730,7 +761,7 @@ static apr_status_t send_brigade_blocking(apr_sock rv = APR_SUCCESS; while (!APR_BRIGADE_EMPTY(bb)) { - rv = send_brigade_nonblocking(s, bb, bytes_written, c); + rv = send_brigade_nonblocking(s, bb, ctx, c); if (rv != APR_SUCCESS) { if (APR_STATUS_IS_EAGAIN(rv)) { /* Wait until we can send more data */ @@ -759,9 +790,9 @@ static apr_status_t send_brigade_blocking(apr_sock } static apr_status_t writev_nonblocking(apr_socket_t *s, - struct iovec *vec, apr_size_t nvec, apr_bucket_brigade *bb, - apr_size_t *cumulative_bytes_written, + core_output_filter_ctx_t *ctx, + apr_size_t nvec, conn_rec *c) { apr_status_t rv = APR_SUCCESS, arv; @@ -768,6 +799,7 @@ static apr_status_t writev_nonblocking(apr_socket_ apr_size_t bytes_written = 0, bytes_to_write = 0; apr_size_t i, offset; apr_interval_time_t old_timeout; + struct iovec *vec = ctx->vec; arv = apr_socket_timeout_get(s, &old_timeout); if (arv != APR_SUCCESS) { @@ -813,8 +845,12 @@ static apr_status_t writev_nonblocking(apr_socket_ if ((ap__logio_add_bytes_out != NULL) && (bytes_written > 0)) { ap__logio_add_bytes_out(c, bytes_written); } - *cumulative_bytes_written += bytes_written; + ctx->bytes_written += bytes_written; + ap_log_cerror(APLOG_MARK, APLOG_TRACE6, rv, c, + "writev_nonblocking(): %"APR_SIZE_T_FMT"/%"APR_SIZE_T_FMT" bytes", + bytes_written, bytes_to_write); + arv = apr_socket_timeout_set(s, old_timeout); if ((arv != APR_SUCCESS) && (rv == APR_SUCCESS)) { return arv; @@ -828,7 +864,7 @@ static apr_status_t writev_nonblocking(apr_socket_ static apr_status_t sendfile_nonblocking(apr_socket_t *s, apr_bucket *bucket, - apr_size_t *cumulative_bytes_written, + core_output_filter_ctx_t *ctx, conn_rec *c) { apr_status_t rv = APR_SUCCESS; @@ -875,7 +911,12 @@ static apr_status_t sendfile_nonblocking(apr_socke if ((ap__logio_add_bytes_out != NULL) && (bytes_written > 0)) { ap__logio_add_bytes_out(c, bytes_written); } - *cumulative_bytes_written += bytes_written; + ctx->bytes_written += bytes_written; + + ap_log_cerror(APLOG_MARK, APLOG_TRACE6, rv, c, + "sendfile_nonblocking(): %"APR_SIZE_T_FMT"/%"APR_OFF_T_FMT" bytes", + bytes_written, file_length); + if ((bytes_written < file_length) && (bytes_written > 0)) { apr_bucket_split(bucket, bytes_written); apr_bucket_delete(bucket); Index: modules/ssl/ssl_engine_io.c =================================================================== --- modules/ssl/ssl_engine_io.c (revision 1783852) +++ modules/ssl/ssl_engine_io.c (working copy) @@ -152,6 +152,9 @@ static int bio_filter_out_flush(BIO *bio) bio_filter_out_ctx_t *outctx = (bio_filter_out_ctx_t *)(bio->ptr); apr_bucket *e; + ap_log_cerror(APLOG_MARK, APLOG_TRACE6, 0, outctx->c, + "bio_filter_out_write(): flush"); + AP_DEBUG_ASSERT(APR_BRIGADE_EMPTY(outctx->bb)); e = apr_bucket_flush_create(outctx->bb->bucket_alloc); @@ -200,6 +203,9 @@ static int bio_filter_out_write(BIO *bio, const ch return -1; } + ap_log_cerror(APLOG_MARK, APLOG_TRACE6, 0, outctx->c, + "bio_filter_out_write(): %d bytes", inl); + /* when handshaking we'll have a small number of bytes. * max size SSL will pass us here is about 16k. * (16413 bytes to be exact) @@ -784,6 +790,9 @@ static apr_status_t ssl_filter_write(ap_filter_t * return APR_EGENERAL; } + ap_log_cerror(APLOG_MARK, APLOG_TRACE6, 0, f->c, + "ssl_filter_write(): %"APR_SIZE_T_FMT" bytes", len); + /* We rely on SSL_get_error() after the write, which requires an empty error * queue before the write in order to work properly. */ @@ -1570,8 +1579,11 @@ static apr_status_t ssl_io_filter_coalesce(ap_filt && (ctx == NULL || bytes + ctx->bytes + e->length < COALESCE_BYTES); e = APR_BUCKET_NEXT(e)) { - if (e->length) count++; /* don't count zero-length buckets */ - bytes += e->length; + /* don't count zero-length buckets */ + if (e->length) { + bytes += e->length; + count++; + } } upto = e;
Index: srclib/apr/include/apr_allocator.h =================================================================== --- srclib/apr/include/apr_allocator.h (revision 1763844) +++ srclib/apr/include/apr_allocator.h (working copy) @@ -93,6 +93,26 @@ APR_DECLARE(apr_memnode_t *) apr_allocator_alloc(a apr_size_t size) __attribute__((nonnull(1))); +/* Get free nodes of [@a total_size:@a block_size] bytes, or allocate + * one of @a block_size bytes if none is found, putting the returned + * number of nodes in @a num and the number of bytes in @a size. + * @param a The allocator to allocate from + * @param block_size The minimum size of a mem block (excluding the memnode + * structure) + * @param total_size The maximum overall size to get on input, the gotten + * size on output (excluding the memnode structure for each + * block) + * @param blocks_num The number of nodes returned in the list (can be NULL) + * @param blocks_fixed Whether all blocks should have a fixed size (i.e. + * @a block_size rounded up to boundary) + */ +APR_DECLARE(apr_memnode_t *) apr_allocator_bulk_alloc(apr_allocator_t *a, + apr_size_t block_size, + apr_size_t *total_size, + apr_size_t *blocks_num, + int blocks_fixed) + __attribute__((nonnull(1,3))); + /** * Free a list of blocks of mem, giving them back to the allocator. * The list is typically terminated by a memnode with its next field @@ -104,6 +124,13 @@ APR_DECLARE(void) apr_allocator_free(apr_allocator apr_memnode_t *memnode) __attribute__((nonnull(1,2))); +/** + * Get the aligned size corresponding to the requested size + * @param size The size to align + * @return The aligned size + */ +APR_DECLARE(apr_size_t) apr_allocator_align(apr_size_t size); + #include "apr_pools.h" /** Index: srclib/apr/memory/unix/apr_pools.c =================================================================== --- srclib/apr/memory/unix/apr_pools.c (revision 1763844) +++ srclib/apr/memory/unix/apr_pools.c (working copy) @@ -111,11 +111,16 @@ struct apr_allocator_t { * slot 19: size 81920 */ apr_memnode_t *free[MAX_INDEX]; + /** Largest used index into free[0], always >= MAX_INDEX */ + apr_uint32_t max_index_oversized; }; #define SIZEOF_ALLOCATOR_T APR_ALIGN_DEFAULT(sizeof(apr_allocator_t)) +/* Returns the amount of free space in the given node. */ +#define node_free_space(node_) ((apr_size_t)(node_->endp - node_->first_avail)) + /* * Allocator */ @@ -209,38 +214,21 @@ APR_DECLARE(void) apr_allocator_max_free_set(apr_a #endif } -static APR_INLINE -apr_memnode_t *allocator_alloc(apr_allocator_t *allocator, apr_size_t in_size) +static +apr_memnode_t *allocator_alloc_index(apr_allocator_t *allocator, + apr_uint32_t index, apr_size_t size, + int lock, int free, int fixed) { apr_memnode_t *node, **ref; apr_uint32_t max_index; - apr_size_t size, i, index; + apr_size_t i; - /* Round up the block size to the next boundary, but always - * allocate at least a certain size (MIN_ALLOC). - */ - size = APR_ALIGN(in_size + APR_MEMNODE_T_SIZE, BOUNDARY_SIZE); - if (size < in_size) { - return NULL; - } - if (size < MIN_ALLOC) - size = MIN_ALLOC; - - /* Find the index for this node size by - * dividing its size by the boundary size - */ - index = (size >> BOUNDARY_INDEX) - 1; - - if (index > APR_UINT32_MAX) { - return NULL; - } - /* First see if there are any nodes in the area we know * our node will fit into. */ if (index <= allocator->max_index) { #if APR_HAS_THREADS - if (allocator->mutex) + if (lock) apr_thread_mutex_lock(allocator->mutex); #endif /* APR_HAS_THREADS */ @@ -257,9 +245,11 @@ APR_DECLARE(void) apr_allocator_max_free_set(apr_a max_index = allocator->max_index; ref = &allocator->free[index]; i = index; - while (*ref == NULL && i < max_index) { - ref++; - i++; + if (!fixed) { + while (*ref == NULL && i < max_index) { + ref++; + i++; + } } if ((node = *ref) != NULL) { @@ -283,7 +273,7 @@ APR_DECLARE(void) apr_allocator_max_free_set(apr_a allocator->current_free_index = allocator->max_free_index; #if APR_HAS_THREADS - if (allocator->mutex) + if (lock) apr_thread_mutex_unlock(allocator->mutex); #endif /* APR_HAS_THREADS */ @@ -294,7 +284,7 @@ APR_DECLARE(void) apr_allocator_max_free_set(apr_a } #if APR_HAS_THREADS - if (allocator->mutex) + if (lock) apr_thread_mutex_unlock(allocator->mutex); #endif /* APR_HAS_THREADS */ } @@ -303,8 +293,9 @@ APR_DECLARE(void) apr_allocator_max_free_set(apr_a * it is not empty. */ else if (allocator->free[0]) { + apr_memnode_t *prev = NULL; #if APR_HAS_THREADS - if (allocator->mutex) + if (lock) apr_thread_mutex_lock(allocator->mutex); #endif /* APR_HAS_THREADS */ @@ -313,17 +304,19 @@ APR_DECLARE(void) apr_allocator_max_free_set(apr_a */ ref = &allocator->free[0]; while ((node = *ref) != NULL && index > node->index) - ref = &node->next; + ref = &(prev = node)->next; - if (node) { + if (node && (!fixed || index == node->index)) { *ref = node->next; + if (!*ref) + allocator->max_index_oversized = prev ? prev->index : 0; allocator->current_free_index += node->index + 1; if (allocator->current_free_index > allocator->max_free_index) allocator->current_free_index = allocator->max_free_index; #if APR_HAS_THREADS - if (allocator->mutex) + if (lock) apr_thread_mutex_unlock(allocator->mutex); #endif /* APR_HAS_THREADS */ @@ -334,11 +327,15 @@ APR_DECLARE(void) apr_allocator_max_free_set(apr_a } #if APR_HAS_THREADS - if (allocator->mutex) + if (lock) apr_thread_mutex_unlock(allocator->mutex); #endif /* APR_HAS_THREADS */ } + if (free) { + return NULL; + } + /* If we haven't got a suitable node, malloc a new one * and initialize it. */ @@ -359,6 +356,163 @@ APR_DECLARE(void) apr_allocator_max_free_set(apr_a } static APR_INLINE +apr_size_t allocator_align(apr_size_t in_size) +{ + apr_size_t size = in_size; + + /* Round up the block size to the next boundary, but always + * allocate at least a certain size (MIN_ALLOC). + */ + size = APR_ALIGN(size + APR_MEMNODE_T_SIZE, BOUNDARY_SIZE); + if (size < in_size) { + return 0; + } + if (size < MIN_ALLOC) { + size = MIN_ALLOC; + } + + return size; +} + +static APR_INLINE +apr_memnode_t *allocator_alloc(apr_allocator_t *allocator, apr_size_t in_size) +{ + apr_size_t size; + apr_uint32_t index; +#if APR_HAS_THREADS + const int lock = !!allocator->mutex; +#else + const int lock = 0; +#endif + + + size = allocator_align(in_size); + if (!size) { + return NULL; + } + + /* Find the index for this node size by + * dividing its size by the boundary size + */ + index = (size >> BOUNDARY_INDEX) - 1; + if (index > APR_UINT32_MAX) { + return NULL; + } + + return allocator_alloc_index(allocator, index, size, lock, 0, 0); +} + +static APR_INLINE +apr_memnode_t *allocator_bulk_alloc(apr_allocator_t *a, + apr_size_t block_size, + apr_size_t *total_size, + apr_size_t *blocks_num, + int blocks_fixed) +{ + apr_memnode_t *node, *nodes = NULL, *last = NULL; + apr_size_t size, len = *total_size, pos = 0, num = 0; + apr_uint32_t index; + + *total_size = 0; + if (blocks_num) { + *blocks_num = 0; + } + + size = allocator_align(block_size); + if (!size) { + return NULL; + } + + /* Find the index for the block size by + * dividing its size by the boundary size + */ + index = (size >> BOUNDARY_INDEX) - 1; + if (index > APR_UINT32_MAX) { + return NULL; + } + + /* Sanity checks */ + if (len < size) { + len = size; + } + else if (len > APR_SIZE_MAX - APR_MEMNODE_T_SIZE) { + len = APR_SIZE_MAX - APR_MEMNODE_T_SIZE; + } + +#if APR_HAS_THREADS + if (a->mutex) + apr_thread_mutex_lock(a->mutex); +#endif /* APR_HAS_THREADS */ + + /* Acquire free blocks with an index equal to (or greater + * than in !blocks_fixed mode) the block index. + */ + if (blocks_fixed) { + for (; pos < len; pos += node_free_space(node), ++num) { + node = allocator_alloc_index(a, index, size, 0, 1, 1); + if (!node) { + break; + } + node->next = nodes; + nodes = node; + } + } + else { + /* Find the largest possible nodes based on the remaining size */ + for (; pos < len; pos += node_free_space(node), ++num) { + apr_size_t n = allocator_align(len - pos); + apr_uint32_t i = (n >> BOUNDARY_INDEX) - 1; + + /* Enough ? */ + if (i < index) { + break; + } + + /* Cap to a free index */ + if (i > a->max_index) { + if (i < MAX_INDEX) { + i = a->max_index; + } + else if (i > a->max_index_oversized) { + i = a->max_index_oversized; + } + if (!i) { + break; + } + } + + /* Can't fail now, queue last (i.e. largest first) */ + node = allocator_alloc_index(a, i, n, 0, 1, 0); + if (last) { + last = last->next = node; + } + else { + nodes = last = node; + } + } + } + + /* Alloc a single node when no (free) one is available above */ + if (!nodes && (nodes = allocator_alloc_index(a, index, size, 0, 0, + blocks_fixed))) { + pos += node_free_space(nodes); + num++; + } + +#if APR_HAS_THREADS + if (a->mutex) + apr_thread_mutex_unlock(a->mutex); +#endif /* APR_HAS_THREADS */ + + *total_size = pos; + if (blocks_num) { + *blocks_num = num; + } + + return nodes; +} + +static APR_INLINE void allocator_free(apr_allocator_t *allocator, apr_memnode_t *node) { apr_memnode_t *next, *freelist = NULL; @@ -402,10 +556,15 @@ void allocator_free(apr_allocator_t *allocator, ap } else { /* This node is too large to keep in a specific size bucket, - * just add it to the sink (at index 0). + * just add it to the sink (at index 0), smallest first. */ - node->next = allocator->free[0]; - allocator->free[0] = node; + apr_memnode_t *pos, **ref = &allocator->free[0]; + while ((pos = *ref) != NULL && index > pos->index) + ref = &pos->next; + node->next = pos; + *ref = node; + if (!pos) + allocator->max_index_oversized = index; if (current_free_index >= index + 1) current_free_index -= index + 1; else @@ -438,6 +597,16 @@ APR_DECLARE(apr_memnode_t *) apr_allocator_alloc(a return allocator_alloc(allocator, size); } +APR_DECLARE(apr_memnode_t *) apr_allocator_bulk_alloc(apr_allocator_t *a, + apr_size_t block_size, + apr_size_t *total_size, + apr_size_t *blocks_num, + int blocks_fixed) +{ + return allocator_bulk_alloc(a, block_size, total_size, blocks_num, + blocks_fixed); +} + APR_DECLARE(void) apr_allocator_free(apr_allocator_t *allocator, apr_memnode_t *node) { @@ -444,6 +613,10 @@ APR_DECLARE(void) apr_allocator_free(apr_allocator allocator_free(allocator, node); } +APR_DECLARE(apr_size_t) apr_allocator_align(apr_size_t size) +{ + return allocator_align(size); +} /* @@ -658,9 +831,6 @@ APR_DECLARE(void) apr_pool_terminate(void) node->next->ref = node->ref; \ } while (0) -/* Returns the amount of free space in the given node. */ -#define node_free_space(node_) ((apr_size_t)(node_->endp - node_->first_avail)) - /* * Memory allocation */ Index: srclib/apr-util/include/apr_buckets.h =================================================================== --- srclib/apr-util/include/apr_buckets.h (revision 1732829) +++ srclib/apr-util/include/apr_buckets.h (working copy) @@ -606,6 +606,14 @@ struct apr_bucket_mmap { /** @see apr_bucket_file */ typedef struct apr_bucket_file apr_bucket_file; +/** Bucket file scatter read mode */ +typedef enum { + APR_BUCKET_FILE_SCATTER_OFF = 0, /** No scattering */ + APR_BUCKET_FILE_SCATTER_ON, /** Scatter read in buffers of at + * least apr_bucket_file#read_size */ + APR_BUCKET_FILE_SCATTER_FIXED /** Scatter read in buffers of fixed + * apr_bucket_file#read_size */ +} apr_bucket_file_scatter_e; /** * A bucket referring to an file */ @@ -622,6 +630,10 @@ struct apr_bucket_file { * a caller tries to read from it */ int can_mmap; #endif /* APR_HAS_MMAP */ + /** @see ::apr_bucket_file_scatter_e */ + int can_scatter; + /** File read block size */ + apr_size_t read_size; }; /** @see apr_bucket_structs */ @@ -969,12 +981,51 @@ APU_DECLARE_NONSTD(void) apr_bucket_alloc_destroy( APU_DECLARE_NONSTD(void *) apr_bucket_alloc(apr_size_t size, apr_bucket_alloc_t *list); /** + * Allocate a vector of memory blocks for use by the buckets. + * @param block_size The minimum size to allocate per block. + * @param total_size The maximum overall size to allocate (in), then the + * amount allocated (out). + * @param list The allocator from which to allocate the memory. + * @param blocks_num The number of blocks in the returned vector (can be NULL). + * @param blocks_fixed Whether all blocks should have a fixed size (i.e. + * @a block_size rounded up boundary). + * @return The allocated vector and blocks, or NULL on failure. + */ +APU_DECLARE_NONSTD(struct iovec *) apr_bucket_bulk_alloc(apr_size_t block_size, + apr_size_t *total_size, + apr_bucket_alloc_t *list, + apr_size_t *blocks_num, + int blocks_fixed); + +/** * Free memory previously allocated with apr_bucket_alloc(). * @param block The block of memory to be freed. */ APU_DECLARE_NONSTD(void) apr_bucket_free(void *block); +/** + * Free the vector of memory blocks previously allocated with + * apr_bucket_bulk_alloc(). + * @param blocks_vec The vector of blocks to be freed. + * @param blocks_num The number of blocks in the vector. + * @param free_vec Whether or not to free the vector itself (in addition + * to the blocks). + * @remark For permormance reasons, this function assumes that the vector + * and all its blocks come from the same allocator. + */ +APU_DECLARE_NONSTD(void) apr_bucket_bulk_free(struct iovec *blocks_vec, + apr_size_t blocks_num, + int free_vec); +/** + * Get the largest size corresponding to the requested size such that + * the (would be) allocation remains in the same allocator's boundary. + * @param size The requested size. + * @return The corresponding size. + */ +APR_DECLARE(apr_size_t) apr_bucket_alloc_floor(apr_size_t size); + + /* ***** Bucket Functions ***** */ /** * Free the resources used by a bucket. If multiple buckets refer to @@ -1563,6 +1614,25 @@ APU_DECLARE(apr_bucket *) apr_bucket_file_make(apr APU_DECLARE(apr_status_t) apr_bucket_file_enable_mmap(apr_bucket *b, int enabled); +/** + * Enable or disable scatter mode for a FILE bucket (default is off) + * @param b The bucket + * @param mode One of ::apr_bucket_file_scatter_e + * @return APR_SUCCESS normally, or an error code if the operation fails + */ +APU_DECLARE(apr_status_t) apr_bucket_file_enable_scatter(apr_bucket *b, + apr_bucket_file_scatter_e mode); + +/** + * Set the size of HEAP bucket's buffer allocated by a FILE bucket on read + * when memory-mapping is disabled + * @param b The bucket + * @param size Size of the allocated buffers + * @return APR_SUCCESS normally, or an error code if the operation fails + */ +APU_DECLARE(apr_status_t) apr_bucket_file_read_size_set(apr_bucket *e, + apr_size_t size); + /** @} */ #ifdef __cplusplus } Index: srclib/apr-util/buckets/apr_buckets_alloc.c =================================================================== --- srclib/apr-util/buckets/apr_buckets_alloc.c (revision 1732829) +++ srclib/apr-util/buckets/apr_buckets_alloc.c (working copy) @@ -121,14 +121,49 @@ APU_DECLARE_NONSTD(void) apr_bucket_alloc_destroy( #endif } -APU_DECLARE_NONSTD(void *) apr_bucket_alloc(apr_size_t size, +APR_DECLARE(apr_size_t) apr_bucket_alloc_floor(apr_size_t size) +{ + if (size <= SMALL_NODE_SIZE) { + size = SMALL_NODE_SIZE; + } + else { + if (size < APR_MEMNODE_T_SIZE) { + size = apr_allocator_align(0); + } + else { + size = apr_allocator_align(size - APR_MEMNODE_T_SIZE); + } + size -= APR_MEMNODE_T_SIZE; + } + size -= SIZEOF_NODE_HEADER_T; + return size; +} + +static APR_INLINE +void apr_bucket_abort(apr_bucket_alloc_t *list) +{ + if (list->pool) { + apr_abortfunc_t fn = apr_pool_abort_get(list->pool); + if (fn) { + fn(APR_ENOMEM); + } + } +} + +APU_DECLARE_NONSTD(void *) apr_bucket_alloc(apr_size_t in_size, apr_bucket_alloc_t *list) { node_header_t *node; apr_memnode_t *active = list->blocks; + apr_size_t size = in_size; char *endp; size += SIZEOF_NODE_HEADER_T; + if (size < in_size) { /* too big? */ + apr_bucket_abort(list); + return NULL; + } + if (size <= SMALL_NODE_SIZE) { if (list->freelist) { node = list->freelist; @@ -140,6 +175,7 @@ APU_DECLARE_NONSTD(void) apr_bucket_alloc_destroy( list->blocks = apr_allocator_alloc(list->allocator, ALLOC_AMT); if (!list->blocks) { list->blocks = active; + apr_bucket_abort(list); return NULL; } list->blocks->next = active; @@ -156,6 +192,7 @@ APU_DECLARE_NONSTD(void) apr_bucket_alloc_destroy( else { apr_memnode_t *memnode = apr_allocator_alloc(list->allocator, size); if (!memnode) { + apr_bucket_abort(list); return NULL; } node = (node_header_t *)memnode->first_avail; @@ -166,6 +203,94 @@ APU_DECLARE_NONSTD(void) apr_bucket_alloc_destroy( return ((char *)node) + SIZEOF_NODE_HEADER_T; } +APU_DECLARE_NONSTD(struct iovec *) apr_bucket_bulk_alloc(apr_size_t block_size, + apr_size_t *total_size, + apr_bucket_alloc_t *list, + apr_size_t *blocks_num, + int blocks_fixed) +{ + struct iovec *vec; + apr_size_t size; + + if (block_size <= SMALL_NODE_SIZE - SIZEOF_NODE_HEADER_T) { + void *mem; + + mem = apr_bucket_alloc(block_size, list); + if (!mem) { + return NULL; + } + + vec = apr_bucket_alloc(sizeof(struct iovec), list); + if (!vec) { + apr_bucket_free(mem); + return NULL; + } + if (blocks_fixed) { + size = block_size; + } + else { + size = SMALL_NODE_SIZE; + } + vec->iov_base = mem; + vec->iov_len = size; + + *total_size = size; + if (blocks_num) { + *blocks_num = 1; + } + } + else { + node_header_t *node; + apr_memnode_t *memnode; + apr_size_t i, n = 0; + + size = block_size + SIZEOF_NODE_HEADER_T; + if (size < block_size) { /* too big? */ + apr_bucket_abort(list); + return NULL; + } + + memnode = apr_allocator_bulk_alloc(list->allocator, size, total_size, + &n, blocks_fixed); + if (!memnode) { + apr_bucket_abort(list); + return NULL; + } + + vec = apr_bucket_alloc(n * sizeof(struct iovec), list); + if (!vec) { + apr_allocator_free(list->allocator, memnode); + return NULL; + } + for (size = i = 0; i < n; size += node->size, ++i) { + apr_memnode_t *next = memnode->next; + + node = (node_header_t *)memnode->first_avail; + memnode->first_avail += SIZEOF_NODE_HEADER_T; + memnode->next = NULL; + + node->alloc = list; + node->memnode = memnode; + if (blocks_fixed) { + node->size = block_size; + } + else { + node->size = memnode->endp - memnode->first_avail; + } + vec[i].iov_base = memnode->first_avail; + vec[i].iov_len = node->size; + + memnode = next; + } + *total_size = size; + if (blocks_num) { + *blocks_num = n; + } + } + + return vec; +} + #ifdef APR_BUCKET_DEBUG #if APR_HAVE_STDLIB_H #include <stdlib.h> @@ -200,3 +325,44 @@ APU_DECLARE_NONSTD(void) apr_bucket_free(void *mem apr_allocator_free(list->allocator, node->memnode); } } + +APU_DECLARE_NONSTD(void) apr_bucket_bulk_free(struct iovec *blocks_vec, + apr_size_t blocks_num, + int free_vec) +{ + apr_bucket_alloc_t *list = NULL; + apr_memnode_t *memnode = NULL; + apr_size_t i; + + for (i = 0; i < blocks_num; ++i) { + node_header_t *node; + char *mem = blocks_vec[i].iov_base; + + node = (node_header_t *)(mem - SIZEOF_NODE_HEADER_T); + if (!list) { + list = node->alloc; + } +#ifdef APR_BUCKET_DEBUG + else if (list != node->alloc) { + abort(); + } +#endif + if (node->size == SMALL_NODE_SIZE) { + check_not_already_free(node); + node->next = list->freelist; + list->freelist = node; + } + else { + apr_memnode_t *n = node->memnode; + n->next = memnode; + memnode = n; + } + } + if (memnode) { + apr_allocator_free(list->allocator, memnode); + } + + if (free_vec) { + apr_bucket_free(blocks_vec); + } +} Index: srclib/apr-util/buckets/apr_buckets_file.c =================================================================== --- srclib/apr-util/buckets/apr_buckets_file.c (revision 1732829) +++ srclib/apr-util/buckets/apr_buckets_file.c (working copy) @@ -78,7 +78,6 @@ static apr_status_t file_bucket_read(apr_bucket *e apr_bucket_file *a = e->data; apr_file_t *f = a->fd; apr_bucket *b = NULL; - char *buf; apr_status_t rv; apr_size_t filelength = e->length; /* bytes remaining in file past offset */ apr_off_t fileoffset = e->start; @@ -85,6 +84,15 @@ static apr_status_t file_bucket_read(apr_bucket *e #if APR_HAS_THREADS && !APR_HAS_XTHREAD_FILES apr_int32_t flags; #endif + apr_size_t size; + int scattered = 0; + union { + char *buf; + struct { + struct iovec *v; + apr_size_t n; + } io; + } u = {0}; #if APR_HAS_MMAP if (file_make_mmap(e, filelength, fileoffset, a->readpool)) { @@ -108,36 +116,119 @@ static apr_status_t file_bucket_read(apr_bucket *e } #endif - *len = (filelength > APR_BUCKET_BUFF_SIZE) - ? APR_BUCKET_BUFF_SIZE - : filelength; *str = NULL; /* in case we die prematurely */ - buf = apr_bucket_alloc(*len, e->list); + *len = 0; /* Handle offset ... */ rv = apr_file_seek(f, APR_SET, &fileoffset); if (rv != APR_SUCCESS) { - apr_bucket_free(buf); return rv; } - rv = apr_file_read(f, buf, len); - if (rv != APR_SUCCESS && rv != APR_EOF) { - apr_bucket_free(buf); + + size = filelength; + if (size > a->read_size) { + if (a->can_scatter) { + scattered = 1; + } + else { + size = a->read_size; + } + } + if (scattered) { + int fixed = (a->can_scatter == APR_BUCKET_FILE_SCATTER_FIXED); + u.io.v = apr_bucket_bulk_alloc(a->read_size, &size, e->list, + &u.io.n, fixed); + rv = apr_file_readv(f, u.io.v, u.io.n, &size); +#if 0 + fprintf(stderr, "file_bucket_read_vec: %"APR_SIZE_T_FMT"/%"APR_SIZE_T_FMT" bytes: %i\n", + size, filelength, rv); + fflush(stdout); +#endif + } + else { + u.buf = apr_bucket_alloc(size, e->list); + rv = apr_file_read(f, u.buf, &size); +#if 0 + fprintf(stderr, "file_bucket_read_buf: %"APR_SIZE_T_FMT"/%"APR_SIZE_T_FMT" bytes\n: %i", + size, filelength, rv); + fflush(stdout); +#endif + } + if (rv != APR_SUCCESS) { + if (scattered) { + apr_bucket_bulk_free(u.io.v, u.io.n, 1); + } + else { + apr_bucket_free(u.buf); + } + if (rv == APR_EOF) { + /* Change the current bucket to an empty one. */ + apr_bucket_immortal_make(e, *str = "", 0); + file_bucket_destroy(a); + } return rv; } - filelength -= *len; + filelength -= size; + /* - * Change the current bucket to refer to what we read, - * even if we read nothing because we hit EOF. + * Change the current bucket to refer to what we read. */ - apr_bucket_heap_make(e, buf, *len, apr_bucket_free); + if (scattered) { + apr_size_t avail = size, i; + *str = u.io.v[0].iov_base; + *len = u.io.v[0].iov_len; + if (*len < avail) { + avail -= *len; + } + else { + *len = avail; + avail = 0; + } +#if 0 + fprintf(stderr, "file_bucket_read[00]: " + "%"APR_SIZE_T_FMT" bytes (%pp)\n", + *len, u.io.v[0].iov_base); + fflush(stdout); +#endif + apr_bucket_heap_make(e, *str, *len, apr_bucket_free); + for (i = 1; avail && i < u.io.n; ++i) { + apr_size_t n = u.io.v[i].iov_len; + if (n < avail) { + avail -= n; + } + else { + n = avail; + avail = 0; + } +#if 0 + fprintf(stderr, "file_bucket_read[%.2"APR_SIZE_T_FMT"]: " + "%"APR_SIZE_T_FMT" bytes (%pp)\n", + i, n, u.io.v[i].iov_base); + fflush(stdout); +#endif + b = apr_bucket_heap_create(u.io.v[i].iov_base, n, + apr_bucket_free, e->list); + APR_BUCKET_INSERT_AFTER(e, b); + e = b; + } + if (i < u.io.n) { + apr_bucket_bulk_free(u.io.v + i, u.io.n - i, 0); + } + apr_bucket_free(u.io.v); + } + else { + *str = u.buf; + *len = size; + apr_bucket_heap_make(e, *str, *len, apr_bucket_free); + } + /* If we have more to read from the file, then create another bucket */ - if (filelength > 0 && rv != APR_EOF) { + if (filelength > 0) { /* for efficiency, we can just build a new apr_bucket struct * to wrap around the existing file bucket */ b = apr_bucket_alloc(sizeof(*b), e->list); - b->start = fileoffset + (*len); + b->start = fileoffset + size; b->length = filelength; b->data = a; b->type = &apr_bucket_type_file; @@ -149,7 +240,6 @@ static apr_status_t file_bucket_read(apr_bucket *e file_bucket_destroy(a); } - *str = buf; return rv; } @@ -165,6 +255,8 @@ APU_DECLARE(apr_bucket *) apr_bucket_file_make(apr #if APR_HAS_MMAP f->can_mmap = 1; #endif + f->can_scatter = APR_BUCKET_FILE_SCATTER_OFF; + f->read_size = APR_BUCKET_BUFF_SIZE; b = apr_bucket_shared_make(b, f, offset, len); b->type = &apr_bucket_type_file; @@ -197,7 +289,60 @@ APU_DECLARE(apr_status_t) apr_bucket_file_enable_m #endif /* APR_HAS_MMAP */ } +APU_DECLARE(apr_status_t) apr_bucket_file_enable_scatter(apr_bucket *e, + apr_bucket_file_scatter_e mode) +{ + apr_bucket_file *a = e->data; + switch (mode) { + case APR_BUCKET_FILE_SCATTER_ON: + a->can_scatter = APR_BUCKET_FILE_SCATTER_ON; + break; + case APR_BUCKET_FILE_SCATTER_FIXED: + a->can_scatter = APR_BUCKET_FILE_SCATTER_FIXED; + break; + case APR_BUCKET_FILE_SCATTER_OFF: + a->can_scatter = APR_BUCKET_FILE_SCATTER_OFF; + break; + default: + return APR_EINVAL; + } + return APR_SUCCESS; +} +/* + * The allocator will always align (round up) the requested size to it's + * boundary size (a multiple of the system's page size). + * + * So we want to account for some potential "external" overhead, which for + * instance could happen when an aligned heap bucket's buffer is given to some + * application (e.g. a cryptographic library) which would then give back a + * transient buffer to set aside, with almost the same size (e.g. plus the + * header of a TLS record encapsulation, a MAC, tag, ...). + * + * By substracting such small overhead here, we allow for optimal recycling in + * and reuse of the aligned heap buffers in this case, otherwise we'd end up + * with a different size for the original block and the one set aside, which + * would hurt the bulk fixed size allocations (at least). + * + * 32 bytes should be enough... + */ +#define EXTERNAL_OVERHEAD 32 + +APU_DECLARE(apr_status_t) apr_bucket_file_read_size_set(apr_bucket *e, + apr_size_t size) +{ + apr_bucket_file *a = e->data; + + if (size < APR_BUCKET_BUFF_SIZE) { + a->read_size = APR_BUCKET_BUFF_SIZE; + } + else { + a->read_size = apr_bucket_alloc_floor(size) - EXTERNAL_OVERHEAD; + } + + return APR_SUCCESS; +} + static apr_status_t file_bucket_setaside(apr_bucket *data, apr_pool_t *reqpool) { apr_bucket_file *a = data->data;
Index: srclib/apr/include/apr_file_io.h =================================================================== --- srclib/apr/include/apr_file_io.h (revision 1763844) +++ srclib/apr/include/apr_file_io.h (working copy) @@ -450,6 +450,27 @@ APR_DECLARE(apr_status_t) apr_file_read(apr_file_t apr_size_t *nbytes); /** + * Read data from the specified file into an iovec. + * @param thefile The file descriptor to read from. + * @param iovec The iovec to scatter the data to. + * @param nvec The iovec length + * @param nbytes On exit, the number of bytes read. + * + * @remark apr_file_readv() will read up to the specified number of + * bytes, but never more. If there isn't enough data to fill that + * number of bytes, all of the available data is read. The forth + * argument is modified to reflect the number of bytes read. If a + * char was put back into the stream via ungetc, or buffered data exist, + * they will be the first characters returned. + * + * @remark It is not possible for both bytes to be read and an #APR_EOF + * or other error to be returned. #APR_EINTR is never returned. + */ +APR_DECLARE(apr_status_t) apr_file_readv(apr_file_t *thefile, + struct iovec *vec, apr_size_t nvec, + apr_size_t *nbytes); + +/** * Write data to the specified file. * @param thefile The file descriptor to write to. * @param buf The buffer which contains the data. Index: srclib/apr/file_io/os2/readwrite.c =================================================================== --- srclib/apr/file_io/os2/readwrite.c (revision 1763844) +++ srclib/apr/file_io/os2/readwrite.c (working copy) @@ -122,6 +122,17 @@ APR_DECLARE(apr_status_t) apr_file_read(apr_file_t +APR_DECLARE(apr_status_t) apr_file_readv(apr_file_t *thefile, + struct iovec *vec, apr_size_t nvec, + apr_size_t *nbytes) +{ + /* TODO: HAVE_READV? */ + *nbytes = 0; + return APR_ENOTIMPL; +} + + + APR_DECLARE(apr_status_t) apr_file_write(apr_file_t *thefile, const void *buf, apr_size_t *nbytes) { ULONG rc = 0; Index: srclib/apr/file_io/unix/readwrite.c =================================================================== --- srclib/apr/file_io/unix/readwrite.c (revision 1763844) +++ srclib/apr/file_io/unix/readwrite.c (working copy) @@ -144,6 +144,115 @@ APR_DECLARE(apr_status_t) apr_file_read(apr_file_t } } +APR_DECLARE(apr_status_t) apr_file_readv(apr_file_t *thefile, + struct iovec *vec, apr_size_t nvec, + apr_size_t *nbytes) +{ + apr_status_t status = APR_SUCCESS; + apr_size_t len, shift, n = 0; + apr_ssize_t rv; + char *pos; + + *nbytes = 0; + do { + if (n >= nvec) { + return APR_SUCCESS; + } + pos = vec[n].iov_base; + len = vec[n].iov_len; + } while (!len && ++n); + + /* Soak up ungetc (if any) */ + if (thefile->ungetchar != -1) { + *pos = (char)thefile->ungetchar; + thefile->ungetchar = -1; + ++*nbytes; + pos++; + len--; + while (!len) { + if (++n >= nvec) { + return APR_SUCCESS; + } + pos = vec[n].iov_base; + len = vec[n].iov_len; + } + } + + /* Soak up buffered data (if any), but don't buffer this readv, + * we want to scatter the input into the iovec without copying. + */ + if (thefile->buffered) { + file_lock(thefile); + if (thefile->direction == 1) { + rv = apr_file_flush_locked(thefile); + if (rv) { + file_unlock(thefile); + return rv; + } + thefile->bufpos = 0; + thefile->direction = 0; + thefile->dataRead = 0; + } + else { + while (thefile->bufpos < thefile->dataRead) { + apr_size_t buflen = thefile->dataRead - thefile->bufpos; + if (buflen > len) { + buflen = len; + } + memcpy(pos, thefile->buffer + thefile->bufpos, buflen); + thefile->bufpos += buflen; + *nbytes += buflen; + pos += buflen; + len -= buflen; + while (!len) { + if (++n >= nvec) { + file_unlock(thefile); + return APR_SUCCESS; + } + pos = vec[n].iov_base; + len = vec[n].iov_len; + } + } + } + file_unlock(thefile); + } + + shift = vec[n].iov_len - len; + if (shift) { + vec[n].iov_base = (char *)vec[n].iov_base + shift; + vec[n].iov_len -= shift; + } + do { + rv = readv(thefile->filedes, vec + n, nvec - n); + } while (rv == -1 && (status = errno) == EINTR); +#ifdef USE_WAIT_FOR_IO + if (rv == -1 && APR_STATUS_IS_EAGAIN(status) && + !*nbytes && thefile->timeout) { + status = apr_wait_for_io_or_timeout(thefile, NULL, 1); + if (status == APR_SUCCESS) { + do { + rv = readv(thefile->filedes, vec + n, nvec - n); + } while (rv == -1 && (status = errno) == EINTR); + } + } +#endif + if (shift) { + vec[n].iov_base = (char *)vec[n].iov_base - shift; + vec[n].iov_len += shift; + } + if (rv > 0) { + *nbytes += rv; + } + if (*nbytes) { + return APR_SUCCESS; + } + if (rv == 0) { + thefile->eof_hit = TRUE; + return APR_EOF; + } + return status; +} + APR_DECLARE(apr_status_t) apr_file_write(apr_file_t *thefile, const void *buf, apr_size_t *nbytes) { apr_size_t rv; Index: srclib/apr/file_io/win32/readwrite.c =================================================================== --- srclib/apr/file_io/win32/readwrite.c (revision 1763844) +++ srclib/apr/file_io/win32/readwrite.c (working copy) @@ -243,6 +243,15 @@ APR_DECLARE(apr_status_t) apr_file_read(apr_file_t return rv; } +APR_DECLARE(apr_status_t) apr_file_readv(apr_file_t *thefile, + struct iovec *vec, apr_size_t nvec, + apr_size_t *nbytes) +{ + /* TODO: ReadFileScatter()? */ + *nbytes = 0; + return APR_ENOTIMPL; +} + APR_DECLARE(apr_status_t) apr_file_write(apr_file_t *thefile, const void *buf, apr_size_t *nbytes) { apr_status_t rv;