On Fri, Jun 27, 2014 at 11:24:08AM +0300, Chrysostomos Nanakos wrote: > VM Image on Archipelago volume is specified like this: > > file.driver=archipelago,file.volume=<volumename>[,file.mport=<mapperd_port>[, > file.vport=<vlmcd_port>][,file.segment=<segment_name>]] > > 'archipelago' is the protocol. > > 'mport' is the port number on which mapperd is listening. This is optional > and if not specified, QEMU will make Archipelago to use the default port. > > 'vport' is the port number on which vlmcd is listening. This is optional > and if not specified, QEMU will make Archipelago to use the default port. > > 'segment' is the name of the shared memory segment Archipelago stack is using. > This is optional and if not specified, QEMU will make Archipelago to use the > default value, 'archipelago'. > > Examples: > > file.driver=archipelago,file.volume=my_vm_volume > file.driver=archipelago,file.volume=my_vm_volume,file.mport=123 > file.driver=archipelago,file.volume=my_vm_volume,file.mport=123, > file.vport=1234 > file.driver=archipelago,file.volume=my_vm_volume,file.mport=123, > file.vport=1234,file.segment=my_segment > > Signed-off-by: Chrysostomos Nanakos <cnana...@grnet.gr>
This is just a superficial review, because I don't have a good idea of what archipelago or libxseg really does (I didn't even compile it or these patches). But I scanned through this patch, and found a few things, and had a few questions. > --- > MAINTAINERS | 6 + > block/Makefile.objs | 2 + > block/archipelago.c | 819 > +++++++++++++++++++++++++++++++++++++++++++++++++++ > configure | 40 +++ > 4 files changed, 867 insertions(+) > create mode 100644 block/archipelago.c > > diff --git a/MAINTAINERS b/MAINTAINERS > index 9b93edd..58ef1e3 100644 > --- a/MAINTAINERS > +++ b/MAINTAINERS > @@ -999,3 +999,9 @@ SSH > M: Richard W.M. Jones <rjo...@redhat.com> > S: Supported > F: block/ssh.c > + > +ARCHIPELAGO > +M: Chrysostomos Nanakos <cnana...@grnet.gr> > +M: Chrysostomos Nanakos <ch...@include.gr> > +S: Maintained > +F: block/archipelago.c > diff --git a/block/Makefile.objs b/block/Makefile.objs > index fd88c03..858d2b3 100644 > --- a/block/Makefile.objs > +++ b/block/Makefile.objs > @@ -17,6 +17,7 @@ block-obj-$(CONFIG_LIBNFS) += nfs.o > block-obj-$(CONFIG_CURL) += curl.o > block-obj-$(CONFIG_RBD) += rbd.o > block-obj-$(CONFIG_GLUSTERFS) += gluster.o > +block-obj-$(CONFIG_ARCHIPELAGO) += archipelago.o > block-obj-$(CONFIG_LIBSSH2) += ssh.o > endif > > @@ -35,5 +36,6 @@ gluster.o-cflags := $(GLUSTERFS_CFLAGS) > gluster.o-libs := $(GLUSTERFS_LIBS) > ssh.o-cflags := $(LIBSSH2_CFLAGS) > ssh.o-libs := $(LIBSSH2_LIBS) > +archipelago.o-libs := $(ARCHIPELAGO_LIBS) > qcow.o-libs := -lz > linux-aio.o-libs := -laio > diff --git a/block/archipelago.c b/block/archipelago.c > new file mode 100644 > index 0000000..c56826a > --- /dev/null > +++ b/block/archipelago.c > @@ -0,0 +1,819 @@ > +/* > + * QEMU Block driver for Archipelago > + * > + * Copyright 2014 GRNET S.A. All rights reserved. > + * > + * Redistribution and use in source and binary forms, with or > + * without modification, are permitted provided that the following > + * conditions are met: > + * > + * 1. Redistributions of source code must retain the above > + * copyright notice, this list of conditions and the following > + * disclaimer. > + * 2. Redistributions in binary form must reproduce the above > + * copyright notice, this list of conditions and the following > + * disclaimer in the documentation and/or other materials > + * provided with the distribution. > + * > + * THIS SOFTWARE IS PROVIDED BY GRNET S.A. ``AS IS'' AND ANY EXPRESS > + * OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED > + * WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR > + * PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL GRNET S.A OR > + * CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, > + * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT > + * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF > + * USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED > + * AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT > + * LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN > + * ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE > + * POSSIBILITY OF SUCH DAMAGE. > + * > + * The views and conclusions contained in the software and > + * documentation are those of the authors and should not be > + * interpreted as representing official policies, either expressed > + * or implied, of GRNET S.A. > + */ > + > +/* > +* VM Image on Archipelago volume is specified like this: > +* > +* > file.driver=archipelago,file.volume=<volumename>[,file.mport=<mapperd_port>[, > +* file.vport=<vlmcd_port>][,file.segment=<segment_name>]] > +* > +* 'archipelago' is the protocol. > +* > +* 'mport' is the port number on which mapperd is listening. This is optional > +* and if not specified, QEMU will make Archipelago to use the default port. > +* > +* 'vport' is the port number on which vlmcd is listening. This is optional > +* and if not specified, QEMU will make Archipelago to use the default port. > +* > +* 'segment' is the name of the shared memory segment Archipelago stack is > using. > +* This is optional and if not specified, QEMU will make Archipelago to use > the > +* default value, 'archipelago'. > +* > +* Examples: > +* > +* file.driver=archipelago,file.volume=my_vm_volume > +* file.driver=archipelago,file.volume=my_vm_volume,file.mport=123 > +* file.driver=archipelago,file.volume=my_vm_volume,file.mport=123, > +* file.vport=1234 > +* file.driver=archipelago,file.volume=my_vm_volume,file.mport=123, > +* file.vport=1234,file.segment=my_segment > +*/ > + > +#include "block/block_int.h" > +#include "qemu/error-report.h" > +#include "qemu/thread.h" > +#include "qapi/qmp/qint.h" > +#include "qapi/qmp/qstring.h" > +#include "qapi/qmp/qjson.h" > + > +#include <inttypes.h> > +#include <xseg/xseg.h> > +#include <xseg/protocol.h> > + > +#define ARCHIP_FD_READ 0 > +#define ARCHIP_FD_WRITE 1 > +#define MAX_REQUEST_SIZE 524288 > + > +#define ARCHIPELAGO_OPT_VOLUME "volume" > +#define ARCHIPELAGO_OPT_SEGMENT "segment" > +#define ARCHIPELAGO_OPT_MPORT "mport" > +#define ARCHIPELAGO_OPT_VPORT "vport" > + > +#define archipelagolog(fmt, ...) \ > + do { \ > + fprintf(stderr, "archipelago\t%-24s: " fmt, __func__, > ##__VA_ARGS__); \ > + } while (0) > + > +typedef enum { > + ARCHIP_OP_READ, > + ARCHIP_OP_WRITE, > + ARCHIP_OP_FLUSH, > + ARCHIP_OP_VOLINFO, > +} ARCHIPCmd; > + > +typedef struct ArchipelagoAIOCB { > + BlockDriverAIOCB common; > + QEMUBH *bh; > + struct BDRVArchipelagoState *s; > + QEMUIOVector *qiov; > + void *buffer; > + ARCHIPCmd cmd; > + bool cancelled; > + int status; > + int64_t size; > + int64_t ret; > +} ArchipelagoAIOCB; > + > +typedef struct BDRVArchipelagoState { > + ArchipelagoAIOCB *event_acb; > + char *volname; > + char *segment_name; > + uint64_t size; > + /* Archipelago specific */ > + struct xseg *xseg; I assume s->xseg is allocated in xseg_join() - is it ever freed? In _close(), there is a final call to xseg_leave(s->xseg), but from what I found in libxseg, it does not appear to be freed: https://github.com/cnanakos/libxseg/blob/develop/src/xseg.c#L975 Is it up to libxseg to free xseg, or the caller? > + struct xseg_port *port; > + xport srcport; > + xport sport; > + xport mportno; > + xport vportno; > + QemuMutex archip_mutex; > + QemuCond archip_cond; > + bool is_signaled; > + /* Request handler specific */ > + QemuThread request_th; > + QemuCond request_cond; > + QemuMutex request_mutex; > + bool th_is_signaled; > + bool stopping; > +} BDRVArchipelagoState; > + > +typedef struct ArchipelagoSegmentedRequest { > + size_t count; > + size_t total; > + int ref; > + int failed; > +} ArchipelagoSegmentedRequest; > + > +typedef struct AIORequestData { > + const char *volname; > + off_t offset; > + size_t size; > + uint64_t bufidx; > + int ret; > + int op; > + ArchipelagoAIOCB *aio_cb; > + ArchipelagoSegmentedRequest *segreq; > +} AIORequestData; > + > +static void qemu_archipelago_complete_aio(void *opaque); > + > +static void init_local_signal(struct xseg *xseg, xport sport, xport srcport) > +{ > + if (xseg && (sport != srcport)) { > + xseg_init_local_signal(xseg, srcport); > + sport = srcport; > + } > +} > + > +static void archipelago_finish_aiocb(AIORequestData *reqdata) > +{ > + if (reqdata->aio_cb->ret != reqdata->segreq->total) { > + reqdata->aio_cb->ret = -EIO; > + } else if (reqdata->aio_cb->ret == reqdata->segreq->total) { > + reqdata->aio_cb->ret = 0; > + } > + reqdata->aio_cb->bh = aio_bh_new( > + bdrv_get_aio_context(reqdata->aio_cb->common.bs), > + qemu_archipelago_complete_aio, reqdata > + ); > + qemu_bh_schedule(reqdata->aio_cb->bh); > +} > + > +static int wait_reply(struct xseg *xseg, xport srcport, struct xseg_port > *port, > + struct xseg_request *expected_req) > +{ > + struct xseg_request *req; > + xseg_prepare_wait(xseg, srcport); > + void *psd = xseg_get_signal_desc(xseg, port); > + while (1) { > + req = xseg_receive(xseg, srcport, 0); > + if (req) { > + if (req != expected_req) { > + archipelagolog("Unknown received request\n"); > + xseg_put_request(xseg, req, srcport); > + } else if (!(req->state & XS_SERVED)) { > + return -1; > + } else { > + break; > + } > + } > + xseg_wait_signal(xseg, psd, 100000UL); > + } > + xseg_cancel_wait(xseg, srcport); > + return 0; > +} > + > +static void xseg_request_handler(void *state) > +{ > + BDRVArchipelagoState *s = (BDRVArchipelagoState *) state; > + void *psd = xseg_get_signal_desc(s->xseg, s->port); > + qemu_mutex_lock(&s->request_mutex); > + > + while (!s->stopping) { > + struct xseg_request *req; > + void *data; > + xseg_prepare_wait(s->xseg, s->srcport); > + req = xseg_receive(s->xseg, s->srcport, 0); Is this a blocking call? If so, is there a timeout, and if not, what scenarios (if any) could cause us to wait here indefinitely? > + if (req) { > + AIORequestData *reqdata; > + ArchipelagoSegmentedRequest *segreq; > + xseg_get_req_data(s->xseg, req, (void **)&reqdata); > + > + switch (reqdata->op) { > + case ARCHIP_OP_READ: > + data = xseg_get_data(s->xseg, req); > + segreq = reqdata->segreq; > + segreq->count += req->serviced; > + > + qemu_iovec_from_buf(reqdata->aio_cb->qiov, > reqdata->bufidx, > + data, > + req->serviced); > + > + xseg_put_request(s->xseg, req, s->srcport); > + > + if ((__sync_add_and_fetch(&segreq->ref, -1)) == 0) { > + if (!segreq->failed) { > + reqdata->aio_cb->ret = segreq->count; > + archipelago_finish_aiocb(reqdata); > + g_free(segreq); > + } else { > + g_free(segreq); > + g_free(reqdata); > + } > + } else { > + g_free(reqdata); > + } > + break; > + case ARCHIP_OP_WRITE: > + segreq = reqdata->segreq; > + segreq->count += req->serviced; > + xseg_put_request(s->xseg, req, s->srcport); > + > + if ((__sync_add_and_fetch(&segreq->ref, -1)) == 0) { > + if (!segreq->failed) { > + reqdata->aio_cb->ret = segreq->count; > + archipelago_finish_aiocb(reqdata); > + g_free(segreq); > + } else { > + g_free(segreq); > + g_free(reqdata); > + } > + } else { > + g_free(reqdata); > + This (OP_WRITE / OP_READ) is where I am worried that we leak in error cases, and a _close() won't clean it up (see later comments). > + break; > + case ARCHIP_OP_VOLINFO: > + s->is_signaled = true; > + qemu_cond_signal(&s->archip_cond); > + break; > + } > + } else { > + xseg_wait_signal(s->xseg, psd, 100000UL); > + } > + xseg_cancel_wait(s->xseg, s->srcport); > + } > + > + s->th_is_signaled = true; > + qemu_cond_signal(&s->request_cond); > + qemu_mutex_unlock(&s->request_mutex); > + qemu_thread_exit(NULL); > +} > + > +static int qemu_archipelago_xseg_init(BDRVArchipelagoState *s) > +{ > + if (xseg_initialize()) { > + archipelagolog("Cannot initialize XSEG\n"); > + goto err_exit; > + } > + > + s->xseg = xseg_join((char *)"posix", s->segment_name, > + (char *)"posixfd", NULL); > + if (!s->xseg) { > + archipelagolog("Cannot join XSEG shared memory segment\n"); > + goto err_exit; > + } > + s->port = xseg_bind_dynport(s->xseg); > + s->srcport = s->port->portno; > + init_local_signal(s->xseg, s->sport, s->srcport); > + return 0; > + > +err_exit: > + return -1; > +} > + > +static int qemu_archipelago_init(BDRVArchipelagoState *s) > +{ > + int ret; > + > + ret = qemu_archipelago_xseg_init(s); > + if (ret < 0) { > + error_report("Cannot initialize XSEG. Aborting...\n"); > + goto err_exit; > + } > + > + qemu_cond_init(&s->archip_cond); > + qemu_mutex_init(&s->archip_mutex); > + qemu_cond_init(&s->request_cond); > + qemu_mutex_init(&s->request_mutex); > + s->th_is_signaled = false; > + qemu_thread_create(&s->request_th, "xseg_io_th", > + (void *) xseg_request_handler, > + (void *) s, QEMU_THREAD_JOINABLE); > + > +err_exit: > + return ret; > +} > + > +static void qemu_archipelago_complete_aio(void *opaque) > +{ > + AIORequestData *reqdata = (AIORequestData *) opaque; > + ArchipelagoAIOCB *aio_cb = (ArchipelagoAIOCB *) reqdata->aio_cb; > + > + qemu_bh_delete(aio_cb->bh); > + qemu_vfree(aio_cb->buffer); > + aio_cb->common.cb(aio_cb->common.opaque, aio_cb->ret); > + aio_cb->status = 0; > + > + if (!aio_cb->cancelled) { > + qemu_aio_release(aio_cb); > + } > + g_free(reqdata); > +} > + > +static QemuOptsList archipelago_runtime_opts = { > + .name = "archipelago", > + .head = QTAILQ_HEAD_INITIALIZER(archipelago_runtime_opts.head), > + .desc = { > + { > + .name = ARCHIPELAGO_OPT_VOLUME, > + .type = QEMU_OPT_STRING, > + .help = "Name of the volume image", > + }, > + { > + .name = ARCHIPELAGO_OPT_SEGMENT, > + .type = QEMU_OPT_STRING, > + .help = "Name of the Archipelago shared memory segment", > + }, > + { > + .name = ARCHIPELAGO_OPT_MPORT, > + .type = QEMU_OPT_NUMBER, > + .help = "Archipelago mapperd port number" > + }, > + { > + .name = ARCHIPELAGO_OPT_VPORT, > + .type = QEMU_OPT_NUMBER, > + .help = "Archipelago vlmcd port number" > + > + }, > + { /* end of list */ } > + }, > +}; > + > +static int qemu_archipelago_open(BlockDriverState *bs, > + QDict *options, > + int bdrv_flags, > + Error **errp) > +{ > + int ret = 0; > + const char *volume, *segment_name; > + QemuOpts *opts; > + Error *local_err = NULL; > + BDRVArchipelagoState *s = bs->opaque; > + > + opts = qemu_opts_create(&archipelago_runtime_opts, NULL, 0, > &error_abort); > + qemu_opts_absorb_qdict(opts, options, &local_err); > + if (local_err) { > + error_propagate(errp, local_err); > + qemu_opts_del(opts); > + return -EINVAL; > + } > + > + s->mportno = qemu_opt_get_number(opts, ARCHIPELAGO_OPT_MPORT, 1001); > + s->vportno = qemu_opt_get_number(opts, ARCHIPELAGO_OPT_VPORT, 501); > + > + segment_name = qemu_opt_get(opts, ARCHIPELAGO_OPT_SEGMENT); > + if (segment_name == NULL) { > + s->segment_name = g_strdup("archipelago"); > + } else { > + s->segment_name = g_strdup(segment_name); > + } > + > + volume = qemu_opt_get(opts, ARCHIPELAGO_OPT_VOLUME); > + if (volume == NULL) { > + error_setg(errp, "archipelago block driver requires the 'volume'" > + " option"); > + qemu_opts_del(opts); > + return -EINVAL; s->segment_name is leaked here. You already have an exit label (err_exit) that cleans everything up, and g_free() is NULL safe (and bs->opaque is zero-initialized). You should be able to just set ret, and 'goto err_exit' in each error instance in qemu_archipelago_open() - this also gets rid of the extra qemu_opts_del() calls. > + } > + s->volname = g_strdup(volume); > + > + /* Initialize XSEG, join shared memory segment */ > + ret = qemu_archipelago_init(s); > + if (ret < 0) { > + error_setg(errp, "cannot initialize XSEG and join shared " > + "memory segment"); > + goto err_exit; > + } > + > + qemu_opts_del(opts); > + return 0; > + > +err_exit: > + g_free(s->volname); > + g_free(s->segment_name); > + qemu_opts_del(opts); > + return ret; > +} > + > +static void qemu_archipelago_close(BlockDriverState *bs) > +{ > + int r, targetlen; > + char *target; > + struct xseg_request *req; > + BDRVArchipelagoState *s = bs->opaque; > + > + s->stopping = true; > + > + qemu_mutex_lock(&s->request_mutex); > + while (!s->th_is_signaled) { > + qemu_cond_wait(&s->request_cond, > + &s->request_mutex); > + } > + qemu_mutex_unlock(&s->request_mutex); > + qemu_thread_join(&s->request_th); > + qemu_cond_destroy(&s->request_cond); > + qemu_mutex_destroy(&s->request_mutex); > + > + qemu_cond_destroy(&s->archip_cond); > + qemu_mutex_destroy(&s->archip_mutex); > + > + targetlen = strlen(s->volname); Should this be strlen(s->volname) + 1, to account for the '\0'? Or does xseg_prep_request() just need the length of the non-null terminated string? > + req = xseg_get_request(s->xseg, s->srcport, s->vportno, X_ALLOC); > + if (!req) { > + archipelagolog("Cannot get XSEG request\n"); > + goto err_exit; > + } > + r = xseg_prep_request(s->xseg, req, targetlen, 0); > + if (r < 0) { > + xseg_put_request(s->xseg, req, s->srcport); What does this do here, if xseg_prep_request() failed? Is it essentially a cleanup function? > + archipelagolog("Cannot prepare XSEG close request\n"); > + goto err_exit; > + } > + > + target = xseg_get_target(s->xseg, req); > + memcpy(target, s->volname, targetlen); > + req->size = req->datalen; > + req->offset = 0; > + req->op = X_CLOSE; > + > + xport p = xseg_submit(s->xseg, req, s->srcport, X_ALLOC); > + if (p == NoPort) { > + xseg_put_request(s->xseg, req, s->srcport); > + archipelagolog("Cannot submit XSEG close request\n"); > + goto err_exit; > + } > + > + xseg_signal(s->xseg, p); > + wait_reply(s->xseg, s->srcport, s->port, req); This is another spot I am wondering if we could get stuck on a blocking call that could potentially wait forever... is there a timeout here? > + > + xseg_put_request(s->xseg, req, s->srcport); > + > +err_exit: > + g_free(s->volname); > + g_free(s->segment_name); > + xseg_quit_local_signal(s->xseg, s->srcport); > + xseg_leave_dynport(s->xseg, s->port); > + xseg_leave(s->xseg); > +} > + > +static void qemu_archipelago_aio_cancel(BlockDriverAIOCB *blockacb) > +{ > + ArchipelagoAIOCB *aio_cb = (ArchipelagoAIOCB *) blockacb; > + aio_cb->cancelled = true; > + while (aio_cb->status == -EINPROGRESS) { > + qemu_aio_wait(); > + } > + qemu_aio_release(aio_cb); > +} > + > +static const AIOCBInfo archipelago_aiocb_info = { > + .aiocb_size = sizeof(ArchipelagoAIOCB), > + .cancel = qemu_archipelago_aio_cancel, > +}; > + > +static int __archipelago_submit_request(BDRVArchipelagoState *s, > + uint64_t bufidx, > + size_t count, > + off_t offset, > + ArchipelagoAIOCB *aio_cb, > + ArchipelagoSegmentedRequest *segreq, > + int op) > +{ > + int ret, targetlen; > + char *target; > + void *data = NULL; > + struct xseg_request *req; > + AIORequestData *reqdata = g_malloc(sizeof(AIORequestData)); > + > + targetlen = strlen(s->volname); > + req = xseg_get_request(s->xseg, s->srcport, s->vportno, X_ALLOC); > + if (!req) { > + archipelagolog("Cannot get XSEG request\n"); > + goto err_exit2; > + } > + ret = xseg_prep_request(s->xseg, req, targetlen, count); > + if (ret < 0) { > + archipelagolog("Cannot prepare XSEG request\n"); > + goto err_exit; > + } > + target = xseg_get_target(s->xseg, req); > + if (!target) { > + archipelagolog("Cannot get XSEG target\n"); > + goto err_exit; > + } > + memcpy(target, s->volname, targetlen); > + req->size = count; > + req->offset = offset; > + > + switch (op) { > + case ARCHIP_OP_READ: > + req->op = X_READ; > + break; > + case ARCHIP_OP_WRITE: > + req->op = X_WRITE; > + break; > + } > + reqdata->volname = s->volname; > + reqdata->offset = offset; > + reqdata->size = count; > + reqdata->bufidx = bufidx; > + reqdata->aio_cb = aio_cb; > + reqdata->segreq = segreq; > + reqdata->op = op; > + > + xseg_set_req_data(s->xseg, req, reqdata); > + if (op == ARCHIP_OP_WRITE) { > + data = xseg_get_data(s->xseg, req); > + if (!data) { > + archipelagolog("Cannot get XSEG data\n"); > + goto err_exit; > + } > + memcpy(data, aio_cb->buffer + bufidx, count); > + } > + > + xport p = xseg_submit(s->xseg, req, s->srcport, X_ALLOC); > + if (p == NoPort) { > + archipelagolog("Could not submit XSEG request\n"); > + goto err_exit; > + } > + xseg_signal(s->xseg, p); > + return 0; > + > +err_exit: > + g_free(reqdata); > + xseg_put_request(s->xseg, req, s->srcport); > + return -EIO; > +err_exit2: > + g_free(reqdata); > + return -EIO; > +} > + > +static int archipelago_aio_segmented_rw(BDRVArchipelagoState *s, > + size_t count, > + off_t offset, > + ArchipelagoAIOCB *aio_cb, > + int op) > +{ > + int i, ret, segments_nr, last_segment_size; > + ArchipelagoSegmentedRequest *segreq; > + > + segreq = g_malloc(sizeof(ArchipelagoSegmentedRequest)); > + > + if (op == ARCHIP_OP_FLUSH) { > + segments_nr = 1; > + segreq->ref = segments_nr; > + segreq->total = count; > + segreq->count = 0; > + segreq->failed = 0; > + ret = __archipelago_submit_request(s, 0, count, offset, aio_cb, > + segreq, ARCHIP_OP_WRITE); > + if (ret < 0) { > + goto err_exit; > + } > + return 0; > + } > + > + segments_nr = (int)(count / MAX_REQUEST_SIZE) + \ > + ((count % MAX_REQUEST_SIZE) ? 1 : 0); > + last_segment_size = (int)(count % MAX_REQUEST_SIZE); > + > + segreq->ref = segments_nr; > + segreq->total = count; > + segreq->count = 0; > + segreq->failed = 0; > + > + for (i = 0; i < segments_nr - 1; i++) { > + ret = __archipelago_submit_request(s, i * MAX_REQUEST_SIZE, > + MAX_REQUEST_SIZE, > + offset + i * MAX_REQUEST_SIZE, > + aio_cb, segreq, op); > + > + if (ret < 0) { > + goto err_exit; > + } > + } > + > + if ((segments_nr > 1) && last_segment_size) { > + ret = __archipelago_submit_request(s, i * MAX_REQUEST_SIZE, > + last_segment_size, > + offset + i * MAX_REQUEST_SIZE, > + aio_cb, segreq, op); > + } else if ((segments_nr > 1) && !last_segment_size) { > + ret = __archipelago_submit_request(s, i * MAX_REQUEST_SIZE, > + MAX_REQUEST_SIZE, > + offset + i * MAX_REQUEST_SIZE, > + aio_cb, segreq, op); > + } else if (segments_nr == 1) { > + ret = __archipelago_submit_request(s, 0, count, offset, aio_cb, > + segreq, op); > + } > + > + if (ret < 0) { > + goto err_exit; > + } > + > + return 0; > + > +err_exit: > + __sync_add_and_fetch(&segreq->failed, 1); > + if (segments_nr == 1) { > + if (__sync_add_and_fetch(&segreq->ref, -1) == 0) { > + g_free(segreq); > + } > + } else { > + if ((__sync_add_and_fetch(&segreq->ref, -segments_nr + i)) == 0) { > + g_free(segreq); > + } > + } Don't we run the risk of leaking segreq here? The other place this is freed is in xseg_request_handler(), but could we run into a race condition where 's->stopping' is true, or even xseg_receive() just does not return a request? > + > + return ret; > +} > + > +static BlockDriverAIOCB *qemu_archipelago_aio_rw(BlockDriverState *bs, > + int64_t sector_num, > + QEMUIOVector *qiov, > + int nb_sectors, > + BlockDriverCompletionFunc > *cb, > + void *opaque, > + int op) > +{ > + ArchipelagoAIOCB *aio_cb; > + BDRVArchipelagoState *s = bs->opaque; > + int64_t size, off; > + int ret; > + > + aio_cb = qemu_aio_get(&archipelago_aiocb_info, bs, cb, opaque); > + aio_cb->cmd = op; > + aio_cb->qiov = qiov; > + > + if (op != ARCHIP_OP_FLUSH) { > + aio_cb->buffer = qemu_blockalign(bs, qiov->size); > + } else { > + aio_cb->buffer = NULL; > + } > + > + aio_cb->ret = 0; > + aio_cb->s = s; > + aio_cb->cancelled = false; > + aio_cb->status = -EINPROGRESS; > + > + if (op == ARCHIP_OP_WRITE) { > + qemu_iovec_to_buf(aio_cb->qiov, 0, aio_cb->buffer, qiov->size); > + } > + > + off = sector_num * BDRV_SECTOR_SIZE; > + size = nb_sectors * BDRV_SECTOR_SIZE; > + aio_cb->size = size; > + > + ret = archipelago_aio_segmented_rw(s, size, off, > + aio_cb, op); > + if (ret < 0) { > + goto err_exit; > + } > + return &aio_cb->common; > + > +err_exit: > + error_report("qemu_archipelago_aio_rw(): I/O Error\n"); > + qemu_vfree(aio_cb->buffer); > + qemu_aio_release(aio_cb); > + return NULL; > +} > + > +static BlockDriverAIOCB *qemu_archipelago_aio_readv(BlockDriverState *bs, > + int64_t sector_num, QEMUIOVector *qiov, int nb_sectors, > + BlockDriverCompletionFunc *cb, void *opaque) > +{ > + return qemu_archipelago_aio_rw(bs, sector_num, qiov, nb_sectors, cb, > + opaque, ARCHIP_OP_READ); > +} > + > +static BlockDriverAIOCB *qemu_archipelago_aio_writev(BlockDriverState *bs, > + int64_t sector_num, QEMUIOVector *qiov, int nb_sectors, > + BlockDriverCompletionFunc *cb, void *opaque) > +{ > + return qemu_archipelago_aio_rw(bs, sector_num, qiov, nb_sectors, cb, > + opaque, ARCHIP_OP_WRITE); > +} > + > +static int64_t archipelago_volume_info(BDRVArchipelagoState *s) > +{ > + uint64_t size; > + int ret, targetlen; > + struct xseg_request *req; > + struct xseg_reply_info *xinfo; > + AIORequestData *reqdata = g_malloc(sizeof(AIORequestData)); > + > + const char *volname = s->volname; > + targetlen = strlen(volname); > + req = xseg_get_request(s->xseg, s->srcport, s->mportno, X_ALLOC); > + if (!req) { > + archipelagolog("Cannot get XSEG request\n"); > + goto err_exit2; > + } > + ret = xseg_prep_request(s->xseg, req, targetlen, > + sizeof(struct xseg_reply_info)); > + if (ret < 0) { > + archipelagolog("Cannot prepare XSEG request\n"); > + goto err_exit; > + } > + char *target = xseg_get_target(s->xseg, req); > + if (!target) { > + archipelagolog("Cannot get XSEG target\n"); > + goto err_exit; > + } > + memcpy(target, volname, targetlen); > + req->size = req->datalen; > + req->offset = 0; > + req->op = X_INFO; > + > + reqdata->op = ARCHIP_OP_VOLINFO; > + reqdata->volname = volname; > + xseg_set_req_data(s->xseg, req, reqdata); > + > + xport p = xseg_submit(s->xseg, req, s->srcport, X_ALLOC); > + if (p == NoPort) { > + archipelagolog("Cannot submit XSEG request\n"); > + goto err_exit; > + } > + xseg_signal(s->xseg, p); > + qemu_mutex_lock(&s->archip_mutex); > + while (!s->is_signaled) { > + qemu_cond_wait(&s->archip_cond, &s->archip_mutex); > + } > + s->is_signaled = false; > + qemu_mutex_unlock(&s->archip_mutex); > + > + xinfo = (struct xseg_reply_info *) xseg_get_data(s->xseg, req); > + size = xinfo->size; > + xseg_put_request(s->xseg, req, s->srcport); > + g_free(reqdata); > + s->size = size; > + return size; > + > +err_exit: > + g_free(reqdata); > + xseg_put_request(s->xseg, req, s->srcport); > + return -1; > +err_exit2: > + g_free(reqdata); > + return -1; > +} This could be simplified to just: err_exit: xseg_put_request(s->xseg, req, s->srcport); err_exit2: g_free(reqdata); return -1; } Maybe it'd also be best to return -EIO (or other meaningful error value) instead of just -1, as this value gets passed along to .bdrv_getlength(). > + > +static int64_t qemu_archipelago_getlength(BlockDriverState *bs) > +{ > + int64_t ret; > + BDRVArchipelagoState *s = bs->opaque; > + > + ret = archipelago_volume_info(s); (This is where I am talking about an error value such as -EIO may be better) > + return ret; > +} > + > +static BlockDriverAIOCB *qemu_archipelago_aio_flush(BlockDriverState *bs, > + BlockDriverCompletionFunc *cb, void *opaque) > +{ > + return qemu_archipelago_aio_rw(bs, 0, NULL, 0, cb, opaque, > + ARCHIP_OP_FLUSH); > +} > + > +static BlockDriver bdrv_archipelago = { > + .format_name = "archipelago", > + .protocol_name = "archipelago", > + .instance_size = sizeof(BDRVArchipelagoState), > + .bdrv_file_open = qemu_archipelago_open, > + .bdrv_close = qemu_archipelago_close, > + .bdrv_getlength = qemu_archipelago_getlength, > + .bdrv_aio_readv = qemu_archipelago_aio_readv, > + .bdrv_aio_writev = qemu_archipelago_aio_writev, > + .bdrv_aio_flush = qemu_archipelago_aio_flush, > + .bdrv_has_zero_init = bdrv_has_zero_init_1, > +}; > + > +static void bdrv_archipelago_init(void) > +{ > + bdrv_register(&bdrv_archipelago); > +} > + > +block_init(bdrv_archipelago_init); > diff --git a/configure b/configure > index 7102964..e4acd9c 100755 > --- a/configure > +++ b/configure > @@ -326,6 +326,7 @@ seccomp="" > glusterfs="" > glusterfs_discard="no" > glusterfs_zerofill="no" > +archipelago="" > virtio_blk_data_plane="" > gtk="" > gtkabi="" > @@ -1087,6 +1088,10 @@ for opt do > ;; > --enable-glusterfs) glusterfs="yes" > ;; > + --disable-archipelago) archipelago="no" > + ;; > + --enable-archipelago) archipelago="yes" > + ;; > --disable-virtio-blk-data-plane) virtio_blk_data_plane="no" > ;; > --enable-virtio-blk-data-plane) virtio_blk_data_plane="yes" > @@ -1382,6 +1387,8 @@ Advanced options (experts only): > --enable-coroutine-pool enable coroutine freelist (better performance) > --enable-glusterfs enable GlusterFS backend > --disable-glusterfs disable GlusterFS backend > + --enable-archipelago enable Archipelago backend > + --disable-archipelago disable Archipelago backend > --enable-gcov enable test coverage analysis with gcov > --gcov=GCOV use specified gcov [$gcov_tool] > --disable-tpm disable TPM support > @@ -3051,6 +3058,33 @@ EOF > fi > fi > > + > +########################################## > +# archipelago probe > +if test "$archipelago" != "no" ; then > + cat > $TMPC <<EOF > +#include <stdio.h> > +#include <xseg/xseg.h> > +#include <xseg/protocol.h> > +int main(void) { > + xseg_initialize(); > + return 0; > +} > +EOF > + archipelago_libs=-lxseg > + if compile_prog "" "$archipelago_libs"; then > + archipelago="yes" > + libs_tools="$archipelago_libs $libs_tools" > + libs_softmmu="$archipelago_libs $libs_softmmu" > + else > + if test "$archipelago" = "yes" ; then > + feature_not_found "Archipelago backend support" "Install libxseg > devel" > + fi > + archipelago="no" > + fi > +fi > + > + > ########################################## > # glusterfs probe > if test "$glusterfs" != "no" ; then > @@ -4230,6 +4264,7 @@ echo "seccomp support $seccomp" > echo "coroutine backend $coroutine" > echo "coroutine pool $coroutine_pool" > echo "GlusterFS support $glusterfs" > +echo "Archipelago support $archipelago" > echo "virtio-blk-data-plane $virtio_blk_data_plane" > echo "gcov $gcov_tool" > echo "gcov enabled $gcov" > @@ -4665,6 +4700,11 @@ if test "$glusterfs_zerofill" = "yes" ; then > echo "CONFIG_GLUSTERFS_ZEROFILL=y" >> $config_host_mak > fi > > +if test "$archipelago" = "yes" ; then > + echo "CONFIG_ARCHIPELAGO=m" >> $config_host_mak > + echo "ARCHIPELAGO_LIBS=$archipelago_libs" >> $config_host_mak > +fi > + > if test "$libssh2" = "yes" ; then > echo "CONFIG_LIBSSH2=m" >> $config_host_mak > echo "LIBSSH2_CFLAGS=$libssh2_cflags" >> $config_host_mak > -- > 1.7.10.4 > >