Make subsequecial changes to make use of introduced CURLDataCache. Moved acb struct from CURLState to BDRVCURLState, and changed to list.
Signed-off-by: Fam Zheng <f...@redhat.com> --- block/curl.c | 156 +++++++++++++++++++++++++++++------------------------------ 1 file changed, 78 insertions(+), 78 deletions(-) diff --git a/block/curl.c b/block/curl.c index be9c32e..7336cdb 100644 --- a/block/curl.c +++ b/block/curl.c @@ -39,7 +39,6 @@ CURLPROTO_TFTP) #define CURL_NUM_STATES 8 -#define CURL_NUM_ACB 8 #define SECTOR_SIZE 512 #define READ_AHEAD_SIZE (256 * 1024) @@ -52,9 +51,7 @@ typedef struct CURLAIOCB { int64_t sector_num; int nb_sectors; - - size_t start; - size_t end; + QLIST_ENTRY(CURLAIOCB) next; } CURLAIOCB; typedef struct CURLDataCache { @@ -70,14 +67,10 @@ typedef struct CURLDataCache { typedef struct CURLState { struct BDRVCURLState *s; - CURLAIOCB *acb[CURL_NUM_ACB]; CURL *curl; - char *orig_buf; - size_t buf_start; - size_t buf_off; - size_t buf_len; char range[128]; char errmsg[CURL_ERROR_SIZE]; + CURLDataCache *cache; char in_use; } CURLState; @@ -92,6 +85,7 @@ typedef struct BDRVCURLState { CURLM *multi; size_t len; CURLState states[CURL_NUM_STATES]; + QLIST_HEAD(, CURLAIOCB) acbs; QLIST_HEAD(, CURLSockInfo) socks; char *url; size_t readahead_size; @@ -221,30 +215,31 @@ static void curl_complete_io(BDRVCURLState *bs, CURLAIOCB *acb, static size_t curl_read_cb(void *ptr, size_t size, size_t nmemb, void *opaque) { - CURLState *s = ((CURLState*)opaque); + CURLState *s = (CURLState *)opaque; + CURLDataCache *c = s->cache; size_t realsize = size * nmemb; - int i; - - DPRINTF("CURL: Just reading %zd bytes\n", realsize); + CURLAIOCB *acb; - if (!s || !s->orig_buf) + if (!c || !c->data) { goto read_end; + } + if (c->write_pos >= c->data_len) { + goto read_end; + } + memcpy(c->data + c->write_pos, ptr, + MIN(realsize, c->data_len - c->write_pos)); + c->write_pos += realsize; + if (c->write_pos >= c->data_len) { + c->write_pos = c->data_len; + } - memcpy(s->orig_buf + s->buf_off, ptr, realsize); - s->buf_off += realsize; - - for(i=0; i<CURL_NUM_ACB; i++) { - CURLAIOCB *acb = s->acb[i]; - - if (!acb) - continue; - - if ((s->buf_off >= acb->end)) { - qemu_iovec_from_buf(acb->qiov, 0, s->orig_buf + acb->start, - acb->end - acb->start); - acb->common.cb(acb->common.opaque, 0); - qemu_aio_release(acb); - s->acb[i] = NULL; + QLIST_FOREACH(acb, &s->s->acbs, next) { + size_t aio_base = acb->sector_num * SECTOR_SIZE; + size_t aio_len = acb->nb_sectors * SECTOR_SIZE; + if (aio_base >= c->base_pos && + aio_base + aio_len <= c->base_pos + c->write_pos) { + QLIST_REMOVE(acb, next); + curl_complete_io(s->s, acb, c); } } @@ -275,10 +270,12 @@ static void curl_fd_handler(void *arg) CURLMsg *msg; msg = curl_multi_info_read(s->multi, &msgs_in_queue); - if (!msg) + if (!msg) { break; - if (msg->msg == CURLMSG_NONE) + } + if (msg->msg == CURLMSG_NONE) { break; + } switch (msg->msg) { case CURLMSG_DONE: @@ -288,19 +285,17 @@ static void curl_fd_handler(void *arg) CURLINFO_PRIVATE, (char **)&state); - /* ACBs for successful messages get completed in curl_read_cb */ + /* ACBs for successful messages get completed in curl_read_cb, + * fail existing acbs for now */ if (msg->data.result != CURLE_OK) { - int i; - for (i = 0; i < CURL_NUM_ACB; i++) { - CURLAIOCB *acb = state->acb[i]; - - if (acb == NULL) { - continue; - } - + CURLAIOCB *acb = QLIST_FIRST(&s->acbs); + while (acb) { + CURLAIOCB *next = QLIST_NEXT(acb, next); + DPRINTF("EIO, %s\n", state->errmsg); acb->common.cb(acb->common.opaque, -EIO); + QLIST_REMOVE(acb, next); qemu_aio_release(acb); - state->acb[i] = NULL; + acb = next; } } @@ -317,13 +312,10 @@ static void curl_fd_handler(void *arg) static CURLState *curl_init_state(BDRVCURLState *s) { CURLState *state = NULL; - int i, j; + int i; do { for (i=0; i<CURL_NUM_STATES; i++) { - for (j=0; j<CURL_NUM_ACB; j++) - if (s->states[i].acb[j]) - continue; if (s->states[i].in_use) continue; @@ -380,6 +372,10 @@ static void curl_clean_state(CURLState *s) if (s->s->multi) curl_multi_remove_handle(s->s->multi, s->curl); s->in_use = 0; + if (s->cache) { + s->cache->use_count--; + assert(s->cache->use_count >= 0); + } } static void curl_parse_filename(const char *filename, QDict *options, @@ -548,14 +544,8 @@ out_noclean: static int curl_aio_flush(void *opaque) { BDRVCURLState *s = opaque; - int i, j; - - for (i=0; i < CURL_NUM_STATES; i++) { - for(j=0; j < CURL_NUM_ACB; j++) { - if (s->states[i].acb[j]) { - return 1; - } - } + if (!QLIST_EMPTY(&s->acbs)) { + return 1; } return 0; } @@ -577,6 +567,7 @@ static void curl_readv_bh_cb(void *p) CURLDataCache *cache = NULL; CURLAIOCB *acb = p; BDRVCURLState *s = acb->common.bs->opaque; + int running; size_t aio_base, aio_bytes; qemu_bh_delete(acb->bh); @@ -585,8 +576,9 @@ static void curl_readv_bh_cb(void *p) aio_base = acb->sector_num * SECTOR_SIZE; aio_bytes = acb->nb_sectors * SECTOR_SIZE; - size_t start = acb->sector_num * SECTOR_SIZE; - size_t end; + if (aio_base + aio_bytes > s->len) { + goto err_release; + } cache = curl_find_cache(s, aio_base, aio_bytes); if (cache) { @@ -597,29 +589,41 @@ static void curl_readv_bh_cb(void *p) // No cache found, so let's start a new request state = curl_init_state(s); if (!state) { - acb->common.cb(acb->common.opaque, -EIO); - qemu_aio_release(acb); - return; + goto err_release; } - acb->start = 0; - acb->end = (acb->nb_sectors * SECTOR_SIZE); - - state->buf_off = 0; - if (state->orig_buf) - g_free(state->orig_buf); - state->buf_start = start; - state->buf_len = acb->end + s->readahead_size; - end = MIN(start + state->buf_len, s->len) - 1; - state->orig_buf = g_malloc(state->buf_len); - state->acb[0] = acb; - - snprintf(state->range, sizeof(state->range) - 1, "%zd-%zd", start, end); - DPRINTF("CURL (AIO): Reading %d at %zd (%s)\n", - (acb->nb_sectors * SECTOR_SIZE), start, state->range); - curl_easy_setopt(state->curl, CURLOPT_RANGE, state->range); + cache = g_malloc0(sizeof(CURLDataCache)); + cache->base_pos = acb->sector_num * SECTOR_SIZE; + cache->data_len = aio_bytes + s->readahead_size; + cache->write_pos = 0; + cache->data = g_malloc(cache->data_len); + QLIST_INSERT_HEAD(&s->acbs, acb, next); + snprintf(state->range, sizeof(state->range) - 1, "%zd-%zd", cache->base_pos, + cache->base_pos + cache->data_len); + DPRINTF("Reading range: %s\n", state->range); + curl_easy_setopt(state->curl, CURLOPT_RANGE, state->range); + QLIST_INSERT_HEAD(&s->cache, cache, next); + state->cache = cache; + cache->use_count++; curl_multi_add_handle(s->multi, state->curl); + /* kick off curl to start the action */ + curl_multi_socket_action(s->multi, 0, CURL_SOCKET_TIMEOUT, &running); + return; + +err_release: + if (cache) { + if (cache->data) { + g_free(cache->data); + cache->data = NULL; + } + g_free(cache); + cache = NULL; + } + acb->common.cb(acb->common.opaque, -EIO); + qemu_aio_release(acb); + return; + } @@ -666,10 +670,6 @@ static void curl_close(BlockDriverState *bs) curl_easy_cleanup(s->states[i].curl); s->states[i].curl = NULL; } - if (s->states[i].orig_buf) { - g_free(s->states[i].orig_buf); - s->states[i].orig_buf = NULL; - } } if (s->multi) curl_multi_cleanup(s->multi); -- 1.8.1.4