Am 18.02.2011 13:55, schrieb Nick Thomas: > This preserves the previous behaviour where the NBD server is > unavailable or goes away during guest execution, but switches the > NBD backend to present the AIO interface instead of the sync IO > interface. > > We also split write requests into 1 MiB blocks (minus request size). > This is a hard limit in the NBD servers (including qemu-nbd), but > never seemed to come up with the previous backend code. > --- > block/nbd.c | 555 > ++++++++++++++++++++++++++++++++++++++++++++++++++--------- > 1 files changed, 470 insertions(+), 85 deletions(-) > > diff --git a/block/nbd.c b/block/nbd.c > index c8dc763..e7b1f7e 100644 > --- a/block/nbd.c > +++ b/block/nbd.c > @@ -1,11 +1,12 @@ > /* > - * QEMU Block driver for NBD > + * QEMU Block driver for NBD - asynchronous IO > * > * Copyright (C) 2008 Bull S.A.S. > * Author: Laurent Vivier <laurent.viv...@bull.net> > * > * Some parts: > * Copyright (C) 2007 Anthony Liguori <anth...@codemonkey.ws> > + * Copyright (C) 2011 Nick Thomas <n...@bytemark.co.uk> > * > * Permission is hereby granted, free of charge, to any person obtaining a > copy > * of this software and associated documentation files (the "Software"), to > deal > @@ -27,66 +28,135 @@ > */ > > #include "qemu-common.h" > +#include "qemu_socket.h" > #include "nbd.h" > #include "module.h" > > #include <sys/types.h> > #include <unistd.h> > > -#define EN_OPTSTR ":exportname=" > +#define EN_OPTSTR ":exportname=" > +#define SECTOR_SIZE 512 > + > +/* 1MiB minus request header size */ > +#define MAX_NBD_WRITE ((1024*1024) - (4 + 4 + 8 + 8 + 4))
-EMAGIC :-) I think we can add an uint32_t magic to struct nbd_request, make it packed and then replace this magic "4 + 4 + 8 + 8 + 4" thing by "sizeof(struct nbd_request)" everywhere. > + > +/* #define DEBUG_NBD */ > + > +#if defined(DEBUG_NBD) > +#define logout(fmt, ...) \ > + fprintf(stderr, "nbd\t%-24s" fmt, __func__, ##__VA_ARGS__) > +#else > +#define logout(fmt, ...) ((void)0) > +#endif > + > + > +typedef struct NBDAIOCB NBDAIOCB; > + > +typedef struct AIOReq { > + NBDAIOCB *aiocb; > + off_t iov_offset; /* Where on the iov does this req start? */ > + off_t offset; /* Starting point of the read */ > + > + size_t data_len; > + uint8_t flags; > + uint64_t handle; > + > + QLIST_ENTRY(AIOReq) outstanding_aio_siblings; > + QLIST_ENTRY(AIOReq) aioreq_siblings; > +} AIOReq; > + > > typedef struct BDRVNBDState { > int sock; > off_t size; > size_t blocksize; > + > + /* Filled in by nbd_config. Store host_spec because DNS may change */ > + bool tcp_conn; /* True, we use TCP. False, UNIX domain sockets */ Should it be an enum then? > + char *export_name; /* An NBD server may export several devices */ > + char *host_spec; /* Path to socket (UNIX) or hostname/IP (TCP) */ > + uint16_t tcp_port; Is this part really related to AIO? If not, it should be a patch of its own. > + > + /* We use these for asynchronous I/O */ > + uint64_t aioreq_seq_num; > + QLIST_HEAD(outstanding_aio_head, AIOReq) outstanding_aio_head; > } BDRVNBDState; > > -static int nbd_open(BlockDriverState *bs, const char* filename, int flags) > +enum AIOCBState { > + AIOCB_WRITE_UDATA, > + AIOCB_READ_UDATA, > +}; > + > +struct NBDAIOCB { > + BlockDriverAIOCB common; > + > + QEMUIOVector *qiov; > + > + int64_t sector_num; > + int nb_sectors; > + > + int ret; > + enum AIOCBState aiocb_type; > + > + QEMUBH *bh; > + void (*aio_done_func)(NBDAIOCB *); > + > + int canceled; bool? > + > + QLIST_HEAD(aioreq_head, AIOReq) aioreq_head; > +}; > + > +static inline int free_aio_req(BDRVNBDState *s, AIOReq *aio_req) > { > - BDRVNBDState *s = bs->opaque; > - uint32_t nbdflags; > + NBDAIOCB *acb = aio_req->aiocb; > + QLIST_REMOVE(aio_req, outstanding_aio_siblings); > + QLIST_REMOVE(aio_req, aioreq_siblings); > + qemu_free(aio_req); > > + return !QLIST_EMPTY(&acb->aioreq_head); > +} > + > +static int nbd_config(BDRVNBDState *s, const char* filename, int flags) > +{ > char *file; > - char *name; > - const char *host; > + char *export_name; > + const char *host_spec; > const char *unixpath; > - int sock; > - off_t size; > - size_t blocksize; > - int ret; > int err = -EINVAL; > > file = qemu_strdup(filename); > > - name = strstr(file, EN_OPTSTR); > - if (name) { > - if (name[strlen(EN_OPTSTR)] == 0) { > + export_name = strstr(file, EN_OPTSTR); > + if (export_name) { > + if (export_name[strlen(EN_OPTSTR)] == 0) { > goto out; > } > - name[0] = 0; > - name += strlen(EN_OPTSTR); > + export_name[0] = 0; /* truncate 'file' */ > + export_name += strlen(EN_OPTSTR); > + s->export_name = qemu_strdup(export_name); > } > > - if (!strstart(file, "nbd:", &host)) { > + /* extract the host_spec - fail if it's not nbd:* */ > + if (!strstart(file, "nbd:", &host_spec)) { > goto out; > } > > - if (strstart(host, "unix:", &unixpath)) { > - > - if (unixpath[0] != '/') { > + /* are we a UNIX or TCP socket? */ > + if (strstart(host_spec, "unix:", &unixpath)) { > + if (unixpath[0] != '/') { /* We demand an absolute path*/ > goto out; > } > - > - sock = unix_socket_outgoing(unixpath); > - > + s->tcp_conn = false; > + s->host_spec = qemu_strdup(unixpath); > } else { > + /* We should have an <IPv4 address>:<port> string to split up */ > uint16_t port = NBD_DEFAULT_PORT; > char *p, *r; > char hostname[128]; > > - pstrcpy(hostname, 128, host); > - > - p = strchr(hostname, ':'); > + pstrcpy(hostname, 128, host_spec); > + p = strchr(hostname, ':'); /* FIXME: IPv6 */ > if (p != NULL) { > *p = '\0'; > p++; > @@ -96,121 +166,436 @@ static int nbd_open(BlockDriverState *bs, const char* > filename, int flags) > goto out; > } > } > + s->tcp_conn = true; > + s->host_spec = qemu_strdup(hostname); > + s->tcp_port = port; > + } > + > + err = 0; > + > +out: > + qemu_free(file); > + if (err != 0) { > + if (s->export_name != NULL) { > + qemu_free(s->export_name); > + } > + if (s->host_spec != NULL) { > + qemu_free(s->host_spec); > + } free (and qemu_free) works just fine with NULL. It's defined to do nothing in this case. > + } > + return err; > +} > + > +static void aio_read_response(void *opaque) > +{ > + BDRVNBDState *s = opaque; > + struct nbd_reply reply; > + > + AIOReq *aio_req = NULL; > + NBDAIOCB *acb; > + int rest; > + > + if (QLIST_EMPTY(&s->outstanding_aio_head)) { > + return; > + } > + > + /* read the header */ > + if (nbd_receive_reply(s->sock, &reply) == -1) { > + logout("Failed to read response from socket\n"); > + /* Having failed to read the reply header, we can't know which > + * aio_req this corresponds to - so we can't signal a failure. > + */ > + return; > + } > + > + /* find the right aio_req from the outstanding_aio list */ > + QLIST_FOREACH(aio_req, &s->outstanding_aio_head, > outstanding_aio_siblings) { > + if (aio_req->handle == reply.handle) { > + break; > + } > + } > + > + if (!aio_req) { > + logout("cannot find aio_req for handle %lu\n", reply.handle); > + return; > + } > + > + acb = aio_req->aiocb; > + > + if (acb->aiocb_type == AIOCB_READ_UDATA) { > + off_t offset = 0; > + int ret = 0; > + size_t total = aio_req->data_len; > + > + while (offset < total) { > + ret = nbd_wr_aio(s->sock, acb->qiov->iov, total - offset, offset, > + true); > + > + if (ret == -1) { > + logout("Error reading from NBD server: %i (%s)\n", > + errno, strerror(errno)); I think you need to call the AIO callback with ret = -errno here. > + return; > + } > + > + offset += ret; > + } > + } > > - sock = tcp_socket_outgoing(hostname, port); > + if (reply.error != 0) { > + acb->ret = -EIO; > + logout("NBD request resulted in error %i\n", reply.error); > } > > + rest = free_aio_req(s, aio_req); > + if (!rest) { > + acb->aio_done_func(acb); > + } > + > + return; > +} > + > +static int aio_flush_request(void *opaque) > +{ > + BDRVNBDState *s = opaque; > + > + return !QLIST_EMPTY(&s->outstanding_aio_head); > +} > + > +/* > + * Connect to the NBD server specified in the state object > + */ > +static int nbd_establish_connection(BlockDriverState *bs) > +{ > + BDRVNBDState *s = bs->opaque; > + int sock; > + int ret; > + off_t size; > + size_t blocksize; > + uint32_t nbdflags; > + > + if (s->tcp_conn == true) { > + sock = tcp_socket_outgoing(s->host_spec, s->tcp_port); > + } else { > + sock = unix_socket_outgoing(s->host_spec); > + } > + > + /* Failed to establish connection */ > if (sock == -1) { > - err = -errno; > - goto out; > + logout("Failed to establish connection to NBD server\n"); > + return -errno; > } > > - ret = nbd_receive_negotiate(sock, name, &nbdflags, &size, &blocksize); > + /* NBD handshake */ > + ret = nbd_receive_negotiate(sock, s->export_name, &nbdflags, &size, > + &blocksize); > if (ret == -1) { > - err = -errno; > - goto out; > + logout("Failed to negotiate with the NBD server\n"); > + closesocket(sock); > + return -errno; > } > > + /* Now that we're connected, set the socket to be non-blocking */ > + socket_set_nonblock(sock); > + > s->sock = sock; > s->size = size; > s->blocksize = blocksize; > - err = 0; > > -out: > - qemu_free(file); > - return err; > + /* Response handler. This is called when there is data to read */ > + qemu_aio_set_fd_handler(sock, aio_read_response, NULL, aio_flush_request, > + NULL, s); > + logout("Established connection with NBD server\n"); > + return 0; > } > > -static int nbd_read(BlockDriverState *bs, int64_t sector_num, > - uint8_t *buf, int nb_sectors) > +static void nbd_teardown_connection(BlockDriverState *bs) > { > + /* Send the final packet to the NBD server and close the socket */ > BDRVNBDState *s = bs->opaque; > struct nbd_request request; > - struct nbd_reply reply; > > - request.type = NBD_CMD_READ; > + request.type = NBD_CMD_DISC; > request.handle = (uint64_t)(intptr_t)bs; > - request.from = sector_num * 512;; > - request.len = nb_sectors * 512; > + request.from = 0; > + request.len = 0; > + nbd_send_request(s->sock, &request); > > - if (nbd_send_request(s->sock, &request) == -1) > - return -errno; > + qemu_aio_set_fd_handler(s->sock, NULL, NULL, NULL, NULL, NULL); > + closesocket(s->sock); > + logout("Connection to NBD server closed\n"); > + return; > +} > > - if (nbd_receive_reply(s->sock, &reply) == -1) > - return -errno; > +static int nbd_open(BlockDriverState *bs, const char* filename, int flags) > +{ > + BDRVNBDState *s = bs->opaque; > + int result; > > - if (reply.error !=0) > - return -reply.error; > + /* Pop the config into our state object. Exit if invalid. */ > + result = nbd_config(s, filename, flags); > > - if (reply.handle != request.handle) > - return -EIO; > + if (result != 0) { > + return result; > + } > > - if (nbd_wr_sync(s->sock, buf, request.len, 1) != request.len) > - return -EIO; > + QLIST_INIT(&s->outstanding_aio_head); > + > + /* establish TCP connection, return error if it fails > + * TODO: Configurable retry-until-timeout behaviour. > + */ > + result = nbd_establish_connection(bs); > + if (result != 0) { > + return result; > + } > > return 0; > } > > -static int nbd_write(BlockDriverState *bs, int64_t sector_num, > - const uint8_t *buf, int nb_sectors) > +static void nbd_close(BlockDriverState *bs) > { > + nbd_teardown_connection(bs); > BDRVNBDState *s = bs->opaque; > + > + if (s->export_name != NULL) { > + qemu_free(s->export_name); > + } > + if (s->host_spec != NULL) { > + qemu_free(s->host_spec); > + } > + > + return; > +} > + > +static int add_aio_request(BDRVNBDState *s, AIOReq *aio_req, QEMUIOVector > *qiov, > + enum AIOCBState aiocb_type) > +{ > struct nbd_request request; > - struct nbd_reply reply; > > - request.type = NBD_CMD_WRITE; > - request.handle = (uint64_t)(intptr_t)bs; > - request.from = sector_num * 512;; > - request.len = nb_sectors * 512; > + request.from = aio_req->offset; > + request.len = aio_req->data_len; > + request.handle = aio_req->handle; > + > + if (aiocb_type == AIOCB_READ_UDATA) { > + request.type = NBD_CMD_READ; > + } else { > + request.type = NBD_CMD_WRITE; > + } > > - if (nbd_send_request(s->sock, &request) == -1) > + /* Write the request to the socket. Header first. */ > + if (nbd_send_request(s->sock, &request) == -1) { > + /* TODO: retry handling. This leads to -EIO and request cancellation > */ > + logout("writing request header to server failed\n"); > return -errno; > + } > > - if (nbd_wr_sync(s->sock, (uint8_t*)buf, request.len, 0) != request.len) > - return -EIO; > + /* If this is a write, send the data too */ > + if (aiocb_type == AIOCB_WRITE_UDATA) { > + int ret = 0; > + off_t offset = 0; > + size_t total = aio_req->data_len; > + > + while (offset < total) { > + ret = nbd_wr_aio(s->sock, qiov->iov, total - offset, > + offset + aio_req->iov_offset, false); > + if (ret == -1) { > + logout("Error writing request data to NBD server: %i (%s)\n", > + errno, strerror(errno)); > + return -EIO; -errno Or maybe nbd_wr_aio should already return -errno so that you can return ret here. > + } > > - if (nbd_receive_reply(s->sock, &reply) == -1) > - return -errno; > + offset += ret; > + } > + } > > - if (reply.error !=0) > - return -reply.error; > + return 0; > +} > + > +static inline AIOReq *alloc_aio_req(BDRVNBDState *s, NBDAIOCB *acb, > + size_t data_len, > + off_t offset, > + off_t iov_offset) > +{ > + AIOReq *aio_req; > > - if (reply.handle != request.handle) > + aio_req = qemu_malloc(sizeof(*aio_req)); > + aio_req->aiocb = acb; > + aio_req->iov_offset = iov_offset; > + aio_req->offset = offset; > + aio_req->data_len = data_len; > + aio_req->handle = s->aioreq_seq_num++; /* FIXME: Trivially guessable */ > + > + QLIST_INSERT_HEAD(&s->outstanding_aio_head, aio_req, > + outstanding_aio_siblings); > + QLIST_INSERT_HEAD(&acb->aioreq_head, aio_req, aioreq_siblings); > + > + return aio_req; > +} > + > +static void nbd_finish_aiocb(NBDAIOCB *acb) > +{ > + if (!acb->canceled) { > + acb->common.cb(acb->common.opaque, acb->ret); > + } > + qemu_aio_release(acb); > +} > + > + > +static void nbd_aio_cancel(BlockDriverAIOCB *blockacb) > +{ > + NBDAIOCB *acb = (NBDAIOCB *)blockacb; > + > + /* > + * We cannot cancel the requests which are already sent to > + * the servers, so we just complete the request with -EIO here. > + */ > + acb->common.cb(acb->common.opaque, -EIO); > + acb->canceled = 1; > +} I think you need to check for acb->canceled before you write to the associated buffer when receiving the reply for a read request. The buffer might not exist any more after the request is cancelled. > + > +static AIOPool nbd_aio_pool = { > + .aiocb_size = sizeof(NBDAIOCB), > + .cancel = nbd_aio_cancel, > +}; > + > +static NBDAIOCB *nbd_aio_setup(BlockDriverState *bs, QEMUIOVector *qiov, > + int64_t sector_num, int nb_sectors, > + BlockDriverCompletionFunc *cb, void > *opaque) > +{ > + NBDAIOCB *acb; > + > + acb = qemu_aio_get(&nbd_aio_pool, bs, cb, opaque); > + > + acb->qiov = qiov; > + > + acb->sector_num = sector_num; > + acb->nb_sectors = nb_sectors; > + > + acb->aio_done_func = NULL; > + acb->canceled = 0; > + acb->bh = NULL; > + acb->ret = 0; > + QLIST_INIT(&acb->aioreq_head); > + return acb; > +} > + > +static int nbd_schedule_bh(QEMUBHFunc *cb, NBDAIOCB *acb) > +{ > + if (acb->bh) { > + logout("bug: %d %d\n", acb->aiocb_type, acb->aiocb_type); > return -EIO; > + } > + > + acb->bh = qemu_bh_new(cb, acb); > + if (!acb->bh) { > + logout("oom: %d %d\n", acb->aiocb_type, acb->aiocb_type); > + return -EIO; > + } > + > + qemu_bh_schedule(acb->bh); > > return 0; > } > > -static void nbd_close(BlockDriverState *bs) > +/* > + * Send I/O requests to the server. > + * > + * This function sends requests to the server, links the requests to > + * the outstanding_list in BDRVNBDState, and exits without waiting for > + * the response. The responses are received in the `aio_read_response' > + * function which is called from the main loop as a fd handler. > + * If this is a write request and it's >1MB, split it into multiple AIOReqs > + */ > +static void nbd_readv_writev_bh_cb(void *p) > { > - BDRVNBDState *s = bs->opaque; > - struct nbd_request request; > + NBDAIOCB *acb = p; > + int ret = 0; > > - request.type = NBD_CMD_DISC; > - request.handle = (uint64_t)(intptr_t)bs; > - request.from = 0; > - request.len = 0; > - nbd_send_request(s->sock, &request); > + size_t len, done = 0; > + size_t total = acb->nb_sectors * SECTOR_SIZE; > + > + /* Where the read/write starts from */ > + size_t offset = acb->sector_num * SECTOR_SIZE; > + BDRVNBDState *s = acb->common.bs->opaque; > + > + AIOReq *aio_req; > > - close(s->sock); > + qemu_bh_delete(acb->bh); > + acb->bh = NULL; > + > + while (done != total) { > + len = (total - done); > + > + /* Split write requests into 1MiB segments */ > + if(acb->aiocb_type == AIOCB_WRITE_UDATA && len > MAX_NBD_WRITE) { > + len = MAX_NBD_WRITE; > + } > + > + aio_req = alloc_aio_req(s, acb, len, offset + done, done); > + ret = add_aio_request(s, aio_req, acb->qiov, acb->aiocb_type); > + > + if (ret < 0) { > + free_aio_req(s, aio_req); > + acb->ret = -EIO; > + goto out; > + } > + > + done += len; > + } > +out: > + if (QLIST_EMPTY(&acb->aioreq_head)) { > + nbd_finish_aiocb(acb); > + } > } > > +static BlockDriverAIOCB *nbd_aio_readv(BlockDriverState *bs, > + int64_t sector_num, QEMUIOVector *qiov, int nb_sectors, > + BlockDriverCompletionFunc *cb, void *opaque) > +{ > + NBDAIOCB *acb; > + int i; > + > + acb = nbd_aio_setup(bs, qiov, sector_num, nb_sectors, cb, opaque); > + acb->aiocb_type = AIOCB_READ_UDATA; > + acb->aio_done_func = nbd_finish_aiocb; > + > + for (i = 0; i < qiov->niov; i++) { > + memset(qiov->iov[i].iov_base, 0, qiov->iov[i].iov_len); > + } qemu_iovec_memset? What is this even for? Aren't these zeros overwritten anyway? > + > + nbd_schedule_bh(nbd_readv_writev_bh_cb, acb); > + return &acb->common; > +} > + > +static BlockDriverAIOCB *nbd_aio_writev(BlockDriverState *bs, > + int64_t sector_num, QEMUIOVector *qiov, int nb_sectors, > + BlockDriverCompletionFunc *cb, void *opaque) > +{ > + NBDAIOCB *acb; > + > + acb = nbd_aio_setup(bs, qiov, sector_num, nb_sectors, cb, opaque); > + acb->aiocb_type = AIOCB_WRITE_UDATA; > + acb->aio_done_func = nbd_finish_aiocb; > + > + nbd_schedule_bh(nbd_readv_writev_bh_cb, acb); > + return &acb->common; > +} > + > + > static int64_t nbd_getlength(BlockDriverState *bs) > { > BDRVNBDState *s = bs->opaque; > - > return s->size; > } > > static BlockDriver bdrv_nbd = { > - .format_name = "nbd", > - .instance_size = sizeof(BDRVNBDState), > - .bdrv_file_open = nbd_open, > - .bdrv_read = nbd_read, > - .bdrv_write = nbd_write, > - .bdrv_close = nbd_close, > - .bdrv_getlength = nbd_getlength, > - .protocol_name = "nbd", > + .format_name = "nbd", > + .instance_size = sizeof(BDRVNBDState), > + .bdrv_file_open = nbd_open, > + .bdrv_aio_readv = nbd_aio_readv, > + .bdrv_aio_writev = nbd_aio_writev, > + .bdrv_close = nbd_close, > + .bdrv_getlength = nbd_getlength, > + .protocol_name = "nbd" > }; > > static void bdrv_nbd_init(void) Kevin