Do all IO requests to the remote server in 256kB chunks. Signed-off-by: David Edmondson <david.edmond...@oracle.com> --- block/curl.c | 151 ++++++++++++++++++++++++++++++++------------- block/trace-events | 2 + 2 files changed, 109 insertions(+), 44 deletions(-)
diff --git a/block/curl.c b/block/curl.c index d2f4de46c9..cfc518efda 100644 --- a/block/curl.c +++ b/block/curl.c @@ -78,6 +78,14 @@ static CURLMcode __curl_multi_socket_action(CURLM *multi_handle, #define CURL_BLOCK_OPT_SSLVERIFY_DEFAULT true #define CURL_BLOCK_OPT_TIMEOUT_DEFAULT 5 +/* Must be a non-zero power of 2. */ +#define CURL_BLOCK_SIZE (256 * 1024) + +/* Align "n" to the start of the containing block. */ +#define CURL_BLOCK_ALIGN(n) ((n) & ~(CURL_BLOCK_SIZE - 1)) +/* The offset of "n" within its' block. */ +#define CURL_BLOCK_OFFSET(n) ((n) & (CURL_BLOCK_SIZE - 1)) + struct BDRVCURLState; struct CURLState; @@ -86,11 +94,18 @@ static bool libcurl_initialized; typedef struct CURLAIOCB { Coroutine *co; QEMUIOVector *qiov; + uint64_t qiov_offset; /* Offset in qiov to place data. */ uint64_t offset; uint64_t bytes; int ret; + /* + * start and end indicate the subset of the surrounding + * CURL_BLOCK_SIZE sized block that is the subject of this + * IOCB. They are offsets from the beginning of the underlying + * buffer. + */ size_t start; size_t end; } CURLAIOCB; @@ -110,7 +125,6 @@ typedef struct CURLState char *orig_buf; uint64_t buf_start; size_t buf_off; - size_t buf_len; char range[128]; char errmsg[CURL_ERROR_SIZE]; char in_use; @@ -259,11 +273,11 @@ static size_t curl_read_cb(void *ptr, size_t size, size_t nmemb, void *opaque) goto read_end; } - if (s->buf_off >= s->buf_len) { + if (s->buf_off >= CURL_BLOCK_SIZE) { /* buffer full, read nothing */ goto read_end; } - realsize = MIN(realsize, s->buf_len - s->buf_off); + realsize = MIN(realsize, CURL_BLOCK_SIZE - s->buf_off); memcpy(s->orig_buf + s->buf_off, ptr, realsize); s->buf_off += realsize; @@ -281,35 +295,44 @@ static bool curl_find_buf(BDRVCURLState *s, uint64_t start, uint64_t len, uint64_t clamped_end = MIN(end, s->len); uint64_t clamped_len = clamped_end - start; - for (i=0; i<CURL_NUM_STATES; i++) { + for (i = 0; i < CURL_NUM_STATES; i++) { CURLState *state = &s->states[i]; - uint64_t buf_end = (state->buf_start + state->buf_off); - uint64_t buf_fend = (state->buf_start + state->buf_len); + /* The end of the currently valid data. */ + uint64_t buf_end = state->buf_start + state->buf_off; + /* The end of the valid data when the IO completes. */ + uint64_t buf_fend = state->buf_start + CURL_BLOCK_SIZE; if (!state->orig_buf) continue; if (!state->buf_off) continue; - // Does the existing buffer cover our section? + /* + * Does the existing buffer cover our section? + */ if ((start >= state->buf_start) && (start <= buf_end) && (clamped_end >= state->buf_start) && (clamped_end <= buf_end)) { - char *buf = state->orig_buf + (start - state->buf_start); + char *buf = state->orig_buf + CURL_BLOCK_OFFSET(start); trace_curl_pending_hit(qemu_coroutine_self(), start, len); - qemu_iovec_from_buf(acb->qiov, 0, buf, clamped_len); + qemu_iovec_from_buf(acb->qiov, acb->qiov_offset, buf, clamped_len); if (clamped_len < len) { - qemu_iovec_memset(acb->qiov, clamped_len, 0, len - clamped_len); + qemu_iovec_memset(acb->qiov, acb->qiov_offset + clamped_len, + 0, len - clamped_len); } acb->ret = 0; return true; } - // Wait for unfinished chunks + /* + * If an in-progress IO will provide the required data, wait + * for it to complete - the initiator will complete this + * aiocb. + */ if (state->in_use && (start >= state->buf_start) && (start <= buf_fend) && @@ -320,10 +343,10 @@ static bool curl_find_buf(BDRVCURLState *s, uint64_t start, uint64_t len, trace_curl_pending_piggyback(qemu_coroutine_self(), start, len); - acb->start = start - state->buf_start; + acb->start = CURL_BLOCK_OFFSET(start); acb->end = acb->start + clamped_len; - for (j=0; j<CURL_NUM_ACB; j++) { + for (j = 0; j < CURL_NUM_ACB; j++) { if (!state->acb[j]) { state->acb[j] = acb; return true; @@ -377,7 +400,7 @@ static void curl_multi_check_completion(BDRVCURLState *s) for (i = 0; i < CURL_NUM_ACB; i++) { CURLAIOCB *acb = state->acb[i]; - if (acb == NULL) { + if (!acb) { continue; } @@ -385,14 +408,15 @@ static void curl_multi_check_completion(BDRVCURLState *s) /* Assert that we have read all data */ assert(state->buf_off >= acb->end); - qemu_iovec_from_buf(acb->qiov, 0, + qemu_iovec_from_buf(acb->qiov, acb->qiov_offset, state->orig_buf + acb->start, acb->end - acb->start); if (acb->end - acb->start < acb->bytes) { size_t offset = acb->end - acb->start; - qemu_iovec_memset(acb->qiov, offset, 0, - acb->bytes - offset); + + qemu_iovec_memset(acb->qiov, acb->qiov_offset + offset, + 0, acb->bytes - offset); } } @@ -539,6 +563,7 @@ static int curl_init_state(BDRVCURLState *s, CURLState *state) static void curl_clean_state(CURLState *s) { int j; + for (j = 0; j < CURL_NUM_ACB; j++) { assert(!s->acb[j]); } @@ -856,18 +881,26 @@ static void curl_setup_preadv(BlockDriverState *bs, CURLAIOCB *acb) BDRVCURLState *s = bs->opaque; - uint64_t start = acb->offset; - uint64_t end; + /* + * Our caller must ensure that this request does not span two + * blocks. + */ + assert(CURL_BLOCK_ALIGN(acb->offset) == + CURL_BLOCK_ALIGN(acb->offset + acb->bytes - 1)); qemu_mutex_lock(&s->mutex); - // In case we have the requested data already (e.g. read-ahead), - // we can just call the callback and be done. - if (curl_find_buf(s, start, acb->bytes, acb)) { + /* + * Check whether the requested data can be found in an existing or + * pending IO request. + */ + if (curl_find_buf(s, acb->offset, acb->bytes, acb)) { goto out; } - // No cache found, so let's start a new request + /* + * No cache found, so let's start a new request. + */ for (;;) { state = curl_find_state(s); if (state) { @@ -882,16 +915,15 @@ static void curl_setup_preadv(BlockDriverState *bs, CURLAIOCB *acb) goto out; } - acb->start = 0; - acb->end = MIN(acb->bytes, s->len - start); + acb->start = CURL_BLOCK_OFFSET(acb->offset); + acb->end = acb->start + MIN(acb->bytes, s->len - acb->offset); state->buf_off = 0; - g_free(state->orig_buf); - state->buf_start = start; - state->buf_len = MIN(acb->end, s->len - start); - end = start + state->buf_len - 1; - state->orig_buf = g_try_malloc(state->buf_len); - if (state->buf_len && state->orig_buf == NULL) { + state->buf_start = CURL_BLOCK_ALIGN(acb->offset); + if (!state->orig_buf) { + state->orig_buf = g_try_malloc(CURL_BLOCK_SIZE); + } + if (!state->orig_buf) { curl_clean_state(state); acb->ret = -ENOMEM; goto out; @@ -899,8 +931,10 @@ static void curl_setup_preadv(BlockDriverState *bs, CURLAIOCB *acb) state->acb[0] = acb; snprintf(state->range, 127, "%" PRIu64 "-%" PRIu64, - s->offset + start, s->offset + end); - trace_curl_setup_preadv(qemu_coroutine_self(), start, acb->bytes); + s->offset + state->buf_start, + s->offset + state->buf_start + CURL_BLOCK_SIZE); + trace_curl_setup_preadv(qemu_coroutine_self(), state->buf_start, + CURL_BLOCK_SIZE); curl_easy_setopt(state->curl, CURLOPT_RANGE, state->range); if (curl_multi_add_handle(s->multi, state->curl) != CURLM_OK) { @@ -921,21 +955,50 @@ out: static int coroutine_fn curl_co_preadv(BlockDriverState *bs, uint64_t offset, uint64_t bytes, QEMUIOVector *qiov, int flags) { - CURLAIOCB acb = { - .co = qemu_coroutine_self(), - .ret = -EINPROGRESS, - .qiov = qiov, - .offset = offset, - .bytes = bytes - }; + /* + * The lower layer does all IO in single CURL_BLOCK_SIZE sized and + * aligned chunks and cannot handle an IO that spans two blocks, + * so split the request here. + */ + int ret = 0; + uint64_t qiov_offset = 0; + uint64_t off = offset; trace_curl_co_preadv(qemu_coroutine_self(), offset, bytes); - curl_setup_preadv(bs, &acb); - while (acb.ret == -EINPROGRESS) { - qemu_coroutine_yield(); + + while (bytes > 0) { + uint64_t len = MIN(bytes, CURL_BLOCK_SIZE - CURL_BLOCK_OFFSET(off)); + CURLAIOCB acb = { + .co = qemu_coroutine_self(), + .ret = -EINPROGRESS, + .qiov = qiov, + .qiov_offset = qiov_offset, + .offset = off, + .bytes = len, + }; + + trace_curl_co_preadv_segment(qemu_coroutine_self(), off, len); + + curl_setup_preadv(bs, &acb); + while (acb.ret == -EINPROGRESS) { + qemu_coroutine_yield(); + } + + ret = acb.ret; + if (ret != 0) { + return ret; + } + + trace_curl_co_preadv_segment_done(qemu_coroutine_self()); + + qiov_offset += len; + off += len; + bytes -= len; } + trace_curl_co_preadv_done(qemu_coroutine_self()); - return acb.ret; + + return ret; } static void curl_close(BlockDriverState *bs) diff --git a/block/trace-events b/block/trace-events index 0b52d2ca1d..72b1e927bf 100644 --- a/block/trace-events +++ b/block/trace-events @@ -200,6 +200,8 @@ curl_open(const char *file) "opening %s" curl_open_size(uint64_t size) "size = %" PRIu64 curl_co_preadv(void *co, uint64_t offset, uint64_t bytes) "co %p requests 0x%" PRIx64 " + 0x%" PRIx64 curl_co_preadv_done(void *co) "co %p done" +curl_co_preadv_segment(void *co, uint64_t offset, uint64_t bytes) "co %p requests 0x%" PRIx64 " + 0x%" PRIx64 +curl_co_preadv_segment_done(void *co) "co %p done" curl_setup_preadv(void *co, uint64_t offset, uint64_t bytes) "co %p requests 0x%" PRIx64 " + 0x%" PRIx64 curl_pending_hit(void *co, uint64_t start, uint64_t len) "co %p finds 0x%" PRIx64 " + 0x%" PRIx64 curl_pending_piggyback(void *co, uint64_t start, uint64_t len) "co %p pending 0x%" PRIx64 " + 0x%" PRIx64 -- 2.27.0