On Thu, Jun 19, 2014 at 05:48:46PM +0300, Chrysostomos Nanakos wrote: > +typedef struct BDRVArchipelagoState { > + int fds[2]; > + int qemu_aio_count;
This field is never used. It's increment and decremented but nothing ever checks the value. It can be dropped. > + int event_reader_pos; > + ArchipelagoAIOCB *event_acb; > + const char *volname; > + uint64_t size; > + /* Archipelago specific */ > + struct xseg *xseg; > + struct xseg_port *port; > + xport srcport; > + xport sport; > + xport mportno; > + xport vportno; > + QemuMutex archip_mutex; > + QemuCond archip_cond; > + bool is_signaled; > + /* Request handler specific */ > + QemuThread request_th; > + QemuCond request_cond; > + QemuMutex request_mutex; > + bool th_is_signaled; > + bool stopping; > +} BDRVArchipelagoState; > + > +typedef struct ArchipelagoSegmentedRequest { > + size_t count; > + size_t total; > + int ref; > + int failed; > +} ArchipelagoSegmentedRequest; > + > +typedef struct AIORequestData { > + const char *volname; > + off_t offset; > + size_t size; > + uint64_t bufidx; > + int ret; > + int op; > + ArchipelagoAIOCB *aio_cb; > + ArchipelagoSegmentedRequest *segreq; > +} AIORequestData; > + > + > +static int qemu_archipelago_signal_pipe(ArchipelagoAIOCB *aio_cb); > + > +static void init_local_signal(struct xseg *xseg, xport sport, xport srcport) > +{ > + if (xseg && (sport != srcport)) { > + xseg_init_local_signal(xseg, srcport); > + sport = srcport; > + } > +} QEMU should clean up by calling xseg_quit_local_signal(). > + > +static void archipelago_finish_aiocb(AIORequestData *reqdata) > +{ > + int ret; > + ret = qemu_archipelago_signal_pipe(reqdata->aio_cb); > + if (ret < 0) { > + error_report("archipelago_finish_aiocb(): failed writing" > + " aio_cb->s->fds"); > + } > + g_free(reqdata); > +} > + > +static int wait_reply(struct xseg *xseg, xport srcport, struct xseg_port > *port, > + struct xseg_request *expected_req) > +{ > + struct xseg_request *req; > + xseg_prepare_wait(xseg, srcport); > + void *psd = xseg_get_signal_desc(xseg, port); > + while (1) { > + req = xseg_receive(xseg, srcport, 0); > + if (req) { > + if (req != expected_req) { > + archipelagolog("Unknown received request\n"); > + xseg_put_request(xseg, req, srcport); > + } else if (!(req->state & XS_SERVED)) { > + archipelagolog("Failed req\n"); > + return -1; > + } else { > + break; > + } > + } > + xseg_wait_signal(xseg, psd, 100000UL); > + } > + xseg_cancel_wait(xseg, srcport); > + return 0; > +} > + > +static void xseg_request_handler(void *state) > +{ This thread is only necessary because you're not integrating xseg into the QEMU event loop. If you got the pipe fds from xseg and used aio_set_fd_handler() you could eliminate this thread. The advantage is that you can skip the archipelago_finish_aiocb() and get slightly better performance due to one less context switch between threads. > + BDRVArchipelagoState *s = (BDRVArchipelagoState *) state; > + void *psd = xseg_get_signal_desc(s->xseg, s->port); > + qemu_mutex_lock(&s->request_mutex); > + > + while (!s->stopping) { > + struct xseg_request *req; > + char *data; > + xseg_prepare_wait(s->xseg, s->srcport); > + req = xseg_receive(s->xseg, s->srcport, 0); > + if (req) { > + AIORequestData *reqdata; > + ArchipelagoSegmentedRequest *segreq; > + xseg_get_req_data(s->xseg, req, (void **)&reqdata); > + > + if (!(req->state & XS_SERVED)) { > + segreq = reqdata->segreq; > + __sync_bool_compare_and_swap(&segreq->failed, 0, 1); > + } > + > + switch (reqdata->op) { > + case ARCHIP_OP_READ: > + data = xseg_get_data(s->xseg, req); > + segreq = reqdata->segreq; > + segreq->count += req->serviced; > + > + qemu_iovec_from_buf(reqdata->aio_cb->qiov, > reqdata->bufidx, > + data, > + req->serviced); > + > + xseg_put_request(s->xseg, req, s->srcport); > + > + __sync_add_and_fetch(&segreq->ref, -1); > + > + if (segreq->ref == 0) { Not sure about the value of __sync_add_and_fetch() since the if statement fetches segreq->ref again. But I'm not reviewing the details of the shared memory accesses. I'm assuming this stuff is correct, secure, etc. > + if (!segreq->failed) { > + reqdata->aio_cb->ret = segreq->count; > + archipelago_finish_aiocb(reqdata); > + } What does segreq->failed mean? We should always finish the I/O request, otherwise the upper layers will run out of resources as we leak failed requests. > +static void parse_filename_opts(const char *filename, Error **errp, > + char **volume, xport *mport, xport *vport) > +{ > + const char *start; > + char *tokens[3], *ds; > + int idx; > + xport lmport = NoPort, lvport = NoPort; > + > + strstart(filename, "archipelago:", &start); > + > + ds = g_strdup(start); > + tokens[0] = strtok(ds, "/"); > + tokens[1] = strtok(NULL, ":"); > + tokens[2] = strtok(NULL, "\0"); > + > + if (!strlen(tokens[0])) { > + error_setg(errp, "volume name must be specified first"); > + return; ds is leaked. > + } > + > + for (idx = 1; idx < 3; idx++) { > + if (tokens[idx] != NULL) { > + if (strstart(tokens[idx], "mport=", NULL)) { > + xseg_find_port(tokens[idx], "mport=", &lmport); > + } > + if (strstart(tokens[idx], "vport=", NULL)) { > + xseg_find_port(tokens[idx], "vport=", &lvport); > + } > + } > + } > + > + if ((lmport == (xport) -2) || (lvport == (xport) -2)) { > + error_setg(errp, "Usage: file=archipelago:" > + "<volumename>[/mport=<mapperd_port>" > + "[:vport=<vlmcd_port>]]"); ds is leaked. > + return; > + } > + *volume = g_strdup(tokens[0]); > + *mport = lmport; > + *vport = lvport; > + g_free(ds); > +} > + > +static void archipelago_parse_filename(const char *filename, QDict *options, > + Error **errp) > +{ > + const char *start; > + char *volume = NULL; > + xport mport = NoPort, vport = NoPort; > + > + if (qdict_haskey(options, ARCHIPELAGO_OPT_VOLUME) > + || qdict_haskey(options, ARCHIPELAGO_OPT_MPORT) > + || qdict_haskey(options, ARCHIPELAGO_OPT_VPORT)) { > + error_setg(errp, "volume/mport/vport and a file name may not be " > + "specified at the same time"); > + return; > + } > + > + if (!strstart(filename, "archipelago:", &start)) { > + error_setg(errp, "File name must start with 'archipelago:'"); > + return; > + } > + > + if (!strlen(start) || strstart(start, "/", NULL)) { > + error_setg(errp, "volume name must be specified"); > + return; > + } > + > + parse_filename_opts(filename, errp, &volume, &mport, &vport); > + > + if (volume) { > + qdict_put(options, ARCHIPELAGO_OPT_VOLUME, qstring_from_str(volume)); > + g_free(volume); > + } > + if (mport != NoPort) { > + qdict_put(options, ARCHIPELAGO_OPT_MPORT, qint_from_int(mport)); > + } > + if (vport != NoPort) { > + qdict_put(options, ARCHIPELAGO_OPT_VPORT, qint_from_int(vport)); > + } > +} > + > +static QemuOptsList archipelago_runtime_opts = { > + .name = "archipelago", > + .head = QTAILQ_HEAD_INITIALIZER(archipelago_runtime_opts.head), > + .desc = { > + { > + .name = ARCHIPELAGO_OPT_VOLUME, > + .type = QEMU_OPT_STRING, > + .help = "Name of the volume image", > + }, > + { > + .name = ARCHIPELAGO_OPT_MPORT, > + .type = QEMU_OPT_NUMBER, > + .help = "Archipelago mapperd port number" > + }, > + { > + .name = ARCHIPELAGO_OPT_VPORT, > + .type = QEMU_OPT_NUMBER, > + .help = "Archipelago vlmcd port number" > + > + }, > + { /* end of list */ } > + }, > +}; > + > +static int qemu_archipelago_open(BlockDriverState *bs, > + QDict *options, > + int bdrv_flags, > + Error **errp) > +{ > + int ret = 0; > + const char *volume; > + QemuOpts *opts; > + Error *local_err = NULL; > + BDRVArchipelagoState *s = bs->opaque; > + > + opts = qemu_opts_create(&archipelago_runtime_opts, NULL, 0, > &error_abort); > + qemu_opts_absorb_qdict(opts, options, &local_err); > + if (local_err) { > + error_propagate(errp, local_err); > + qemu_opts_del(opts); > + return -EINVAL; > + } > + > + s->mportno = qemu_opt_get_number(opts, ARCHIPELAGO_OPT_MPORT, 1001); > + s->vportno = qemu_opt_get_number(opts, ARCHIPELAGO_OPT_VPORT, 501); > + > + volume = qemu_opt_get(opts, ARCHIPELAGO_OPT_VOLUME); > + if (volume == NULL) { > + error_setg(errp, "archipelago block driver requires an 'volume'" > + " options"); "archipelago block driver requires the 'volume' option" > + error_propagate(errp, local_err); This line is unnecessary since the error message was already put into errp. > + qemu_opts_del(opts); > + return -EINVAL; > + } > + s->volname = g_strdup(volume); > + > + /* Initialize XSEG, join shared memory segment */ > + ret = qemu_archipelago_init(s); > + if (ret < 0) { > + error_setg(errp, "cannot initialize XSEG and join shared " > + "memory segment"); > + goto err_exit; > + } > + > + s->event_reader_pos = 0; > + ret = qemu_pipe(s->fds); > + if (ret < 0) { > + error_setg(errp, "cannot create pipe"); > + goto err_exit; Do we need to xseg_leave() to avoid leaking xseg refcounts, leaving memory mapped, and memory leaks? > + } > + > + fcntl(s->fds[ARCHIP_FD_READ], F_SETFL, O_NONBLOCK); > + fcntl(s->fds[ARCHIP_FD_WRITE], F_SETFL, O_NONBLOCK); > + qemu_aio_set_fd_handler(s->fds[ARCHIP_FD_READ], > + qemu_archipelago_aio_event_reader, NULL, > + s); > + > + qemu_opts_del(opts); > + return 0; > + > +err_exit: > + qemu_opts_del(opts); > + return ret; s->volname is leaked > +} > + > +static void qemu_archipelago_close(BlockDriverState *bs) > +{ > + int r, targetlen; > + char *target; > + struct xseg_request *req; > + BDRVArchipelagoState *s = bs->opaque; > + > + qemu_aio_set_fd_handler(s->fds[ARCHIP_FD_READ], NULL, NULL, NULL); > + close(s->fds[0]); > + close(s->fds[1]); > + > + s->stopping = true; > + > + qemu_mutex_lock(&s->request_mutex); > + while (!s->th_is_signaled) { > + qemu_cond_wait(&s->request_cond, > + &s->request_mutex); > + } > + qemu_mutex_unlock(&s->request_mutex); > + qemu_cond_destroy(&s->request_cond); > + qemu_mutex_destroy(&s->request_mutex); It's not safe to qemu_mutex_destroy() because the other thread may still be inside qemu_mutex_unlock(&s->request_mutex) and may still access s->request_mutex memory. Use qemu_thread_join() before destroying request_cond and request_mutex. That way you can be sure there is no race condition. (I recently did the same thing and Paolo Bonzini pointed out the bug. After checking the glibc implementation I was convinced that it's not safe.) > + > + qemu_cond_destroy(&s->archip_cond); > + qemu_mutex_destroy(&s->archip_mutex); > + > + targetlen = strlen(s->volname); > + req = xseg_get_request(s->xseg, s->srcport, s->vportno, X_ALLOC); > + if (!req) { > + archipelagolog("Cannot get XSEG request\n"); > + goto err_exit; > + } > + r = xseg_prep_request(s->xseg, req, targetlen, 0); > + if (r < 0) { > + xseg_put_request(s->xseg, req, s->srcport); > + archipelagolog("Cannot prepare XSEG close request\n"); > + goto err_exit; > + } > + > + target = xseg_get_target(s->xseg, req); > + strncpy(target, s->volname, targetlen); Using strncpy() hints that target is a string when in fact it's not. I think memcpy() would be clearer here since you don't want a '\0' byte at the end of the string. Or maybe I'm wrong and there is some guarantee that there will be a '\0' byte after target? > + req->size = req->datalen; > + req->offset = 0; > + req->op = X_CLOSE; > + > + xport p = xseg_submit(s->xseg, req, s->srcport, X_ALLOC); > + if (p == NoPort) { > + xseg_put_request(s->xseg, req, s->srcport); > + archipelagolog("Cannot submit XSEG close request\n"); > + goto err_exit; > + } > + > + xseg_signal(s->xseg, p); > + r = wait_reply(s->xseg, s->srcport, s->port, req); > + if (r < 0) { > + archipelagolog("wait_reply() error\n"); > + } > + if (!(req->state & XS_SERVED)) { > + archipelagolog("Could no close map for volume '%s'\n", s->volname); > + } > + > + xseg_put_request(s->xseg, req, s->srcport); > + > +err_exit: > + xseg_leave_dynport(s->xseg, s->port); > + xseg_leave(s->xseg); s->volname is leaked. > +} > + > +static void qemu_archipelago_aio_cancel(BlockDriverAIOCB *blockacb) > +{ > + ArchipelagoAIOCB *aio_cb = (ArchipelagoAIOCB *) blockacb; > + aio_cb->cancelled = true; > + while (aio_cb->status == -EINPROGRESS) { > + qemu_aio_wait(); > + } > + qemu_aio_release(aio_cb); > +} > + > +static const AIOCBInfo archipelago_aiocb_info = { > + .aiocb_size = sizeof(ArchipelagoAIOCB), > + .cancel = qemu_archipelago_aio_cancel, > +}; > + > +static int qemu_archipelago_signal_pipe(ArchipelagoAIOCB *aio_cb) > +{ > + int ret = 0; > + while (1) { > + fd_set wfd; > + int fd = aio_cb->s->fds[1]; > + > + ret = write(fd, (void *)&aio_cb, sizeof(aio_cb)); > + if (ret > 0) { > + break; > + } > + if (errno == EINTR) { > + continue; > + } > + if (errno != EAGAIN) { > + break; > + } > + FD_ZERO(&wfd); > + FD_SET(fd, &wfd); > + do { > + ret = select(fd + 1, NULL, &wfd, NULL, NULL); > + } while (ret < 0 && errno == EINTR); > + } > + return ret; > +} A newer signalling approach is available and will let you drop the pipe code. QEMUBH is a "bottom half" or deferred function call that can be scheduled in an event loop. Scheduling the the QEMUBH is thread-safe so you can perform it from any thread. See block/gluster.c:gluster_finish_aiocb() for an example using QEMUBH. > +static int64_t archipelago_volume_info(BDRVArchipelagoState *s) > +{ > + uint64_t size; > + int ret, targetlen; > + struct xseg_request *req; > + struct xseg_reply_info *xinfo; > + AIORequestData *reqdata = g_malloc(sizeof(AIORequestData)); > + > + if (!reqdata) { > + archipelagolog("Cannot allocate reqdata\n"); > + return -1; g_malloc() never returns NULL, this if statement can be dropped.
pgpkocNjtzPEW.pgp
Description: PGP signature