On Fri, Jun 05, 2020 at 07:35:36AM +0800, Coiby Xu wrote: > +static bool coroutine_fn > +vu_message_read(VuDev *vu_dev, int conn_fd, VhostUserMsg *vmsg) > +{ > + struct iovec iov = { > + .iov_base = (char *)vmsg, > + .iov_len = VHOST_USER_HDR_SIZE, > + }; > + int rc, read_bytes = 0; > + Error *local_err = NULL; > + /* > + * Store fds/nfds returned from qio_channel_readv_full into > + * temporary variables. > + * > + * VhostUserMsg is a packed structure, gcc will complain about passing > + * pointer to a packed structure member if we pass &VhostUserMsg.fd_num > + * and &VhostUserMsg.fds directly when calling qio_channel_readv_full, > + * thus two temporary variables nfds and fds are used here. > + */ > + size_t nfds = 0, nfds_t = 0; > + int *fds = NULL, *fds_t = NULL; > + VuServer *server = container_of(vu_dev, VuServer, vu_dev); > + QIOChannel *ioc = NULL; > + > + if (conn_fd == server->sioc->fd) { > + ioc = server->ioc; > + } else { > + /* Slave communication will also use this function to read msg */ > + ioc = slave_io_channel(server, conn_fd, &local_err); > + } > + > + if (!ioc) { > + error_report_err(local_err); > + goto fail; > + } > + > + assert(qemu_in_coroutine()); > + do { > + /* > + * qio_channel_readv_full may have short reads, keeping calling it > + * until getting VHOST_USER_HDR_SIZE or 0 bytes in total > + */ > + rc = qio_channel_readv_full(ioc, &iov, 1, &fds_t, &nfds_t, > &local_err); > + if (rc < 0) { > + if (rc == QIO_CHANNEL_ERR_BLOCK) { > + qio_channel_yield(ioc, G_IO_IN); > + continue; > + } else { > + error_report_err(local_err); > + return false; > + } > + } > + read_bytes += rc; > + if (nfds_t > 0) { > + fds = g_renew(int, fds, nfds + nfds_t); > + memcpy(fds + nfds, fds_t, nfds_t *sizeof(int)); > + nfds += nfds_t; > + if (nfds > VHOST_MEMORY_MAX_NREGIONS) { > + error_report("A maximum of %d fds are allowed, " > + "however got %lu fds now", > + VHOST_MEMORY_MAX_NREGIONS, nfds); > + goto fail; > + } > + g_free(fds_t);
I'm not sure why the temporary fds[] array is necessary. Copying the fds directly into vmsg->fds would be simpler: if (nfds + nfds_t > G_N_ELEMENTS(vmsg->fds)) { error_report("A maximum of %d fds are allowed, " "however got %lu fds now", VHOST_MEMORY_MAX_NREGIONS, nfds); goto fail; } memcpy(vmsg->fds + nfds, fds_t, nfds_t * sizeof(vds->fds[0])); nfds += nfds_t; Did I misunderstand how this works? > + } > + if (read_bytes == VHOST_USER_HDR_SIZE || rc == 0) { > + break; > + } > + iov.iov_base = (char *)vmsg + read_bytes; > + iov.iov_len = VHOST_USER_HDR_SIZE - read_bytes; > + } while (true); > + > + vmsg->fd_num = nfds; > + if (nfds > 0) { > + memcpy(vmsg->fds, fds, nfds * sizeof(int)); > + } > + g_free(fds); > + /* qio_channel_readv_full will make socket fds blocking, unblock them */ > + vmsg_unblock_fds(vmsg); > + if (vmsg->size > sizeof(vmsg->payload)) { > + error_report("Error: too big message request: %d, " > + "size: vmsg->size: %u, " > + "while sizeof(vmsg->payload) = %zu", > + vmsg->request, vmsg->size, sizeof(vmsg->payload)); > + goto fail; > + } > + > + struct iovec iov_payload = { > + .iov_base = (char *)&vmsg->payload, > + .iov_len = vmsg->size, > + }; > + if (vmsg->size) { > + rc = qio_channel_readv_all_eof(ioc, &iov_payload, 1, &local_err); > + if (rc == -1) { > + error_report_err(local_err); > + goto fail; > + } > + } > + > + return true; > + > +fail: > + vmsg_close_fds(vmsg); > + > + return false; > +} > + > + > +static void vu_client_start(VuServer *server); > +static coroutine_fn void vu_client_trip(void *opaque) > +{ > + VuServer *server = opaque; > + > + while (!server->aio_context_changed && server->sioc) { > + vu_dispatch(&server->vu_dev); > + } > + > + if (server->aio_context_changed && server->sioc) { > + server->aio_context_changed = false; > + vu_client_start(server); > + } > +} > + > +static void vu_client_start(VuServer *server) > +{ > + server->co_trip = qemu_coroutine_create(vu_client_trip, server); > + aio_co_enter(server->ctx, server->co_trip); > +} > + > +/* > + * a wrapper for vu_kick_cb > + * > + * since aio_dispatch can only pass one user data pointer to the > + * callback function, pack VuDev and pvt into a struct. Then unpack it > + * and pass them to vu_kick_cb > + */ > +static void kick_handler(void *opaque) > +{ > + KickInfo *kick_info = opaque; > + kick_info->cb(kick_info->vu_dev, 0, (void *) kick_info->index); > +} > + > + > +static void > +set_watch(VuDev *vu_dev, int fd, int vu_evt, > + vu_watch_cb cb, void *pvt) > +{ > + > + VuServer *server = container_of(vu_dev, VuServer, vu_dev); > + g_assert(vu_dev); > + g_assert(fd >= 0); > + long index = (intptr_t) pvt; > + g_assert(cb); > + KickInfo *kick_info = &server->kick_info[index]; > + if (!kick_info->cb) { > + kick_info->fd = fd; > + kick_info->cb = cb; > + qemu_set_nonblock(fd); > + aio_set_fd_handler(server->ioc->ctx, fd, false, kick_handler, > + NULL, NULL, kick_info); > + kick_info->vu_dev = vu_dev; > + } > +} > + > + > +static void remove_watch(VuDev *vu_dev, int fd) > +{ > + VuServer *server; > + int i; > + int index = -1; > + g_assert(vu_dev); > + g_assert(fd >= 0); > + > + server = container_of(vu_dev, VuServer, vu_dev); > + for (i = 0; i < vu_dev->max_queues; i++) { > + if (server->kick_info[i].fd == fd) { > + index = i; > + break; > + } > + } > + > + if (index == -1) { > + return; > + } > + server->kick_info[i].cb = NULL; > + aio_set_fd_handler(server->ioc->ctx, fd, false, NULL, NULL, NULL, NULL); > +} > + > + > +static void vu_accept(QIONetListener *listener, QIOChannelSocket *sioc, > + gpointer opaque) > +{ > + VuServer *server = opaque; > + > + if (server->sioc) { > + warn_report("Only one vhost-user client is allowed to " > + "connect the server one time"); > + return; > + } > + > + if (!vu_init(&server->vu_dev, server->max_queues, sioc->fd, panic_cb, > + vu_message_read, set_watch, remove_watch, > server->vu_iface)) { > + error_report("Failed to initialized libvhost-user"); > + return; > + } > + > + /* > + * Unset the callback function for network listener to make another > + * vhost-user client keeping waiting until this client disconnects > + */ > + qio_net_listener_set_client_func(server->listener, > + NULL, > + NULL, > + NULL); > + server->sioc = sioc; > + server->kick_info = g_new0(KickInfo, server->max_queues); Where is kick_info freed? > + /* > + * Increase the object reference, so cioc will not freed by s/cioc/sioc/ > + * qio_net_listener_channel_func which will call > object_unref(OBJECT(sioc)) > + */ > + object_ref(OBJECT(server->sioc)); > + qio_channel_set_name(QIO_CHANNEL(sioc), "vhost-user client"); > + server->ioc = QIO_CHANNEL(sioc); > + object_ref(OBJECT(server->ioc)); > + object_ref(OBJECT(sioc)); Why are there two object_refs for sioc and where is unref called? > + qio_channel_attach_aio_context(server->ioc, server->ctx); > + qio_channel_set_blocking(QIO_CHANNEL(server->sioc), false, NULL); > + vu_client_start(server); > +} > + > + > +void vhost_user_server_stop(VuServer *server) > +{ > + if (!server) { > + return; > + } > + > + if (server->sioc) { > + close_client(server); > + object_unref(OBJECT(server->sioc)); This call is object_unref(NULL) since close_client() does server->sioc = NULL. > + } > + > + if (server->listener) { > + qio_net_listener_disconnect(server->listener); > + object_unref(OBJECT(server->listener)); > + } > +} > + > +static void detach_context(VuServer *server) > +{ > + int i; > + AioContext *ctx = server->ioc->ctx; > + qio_channel_detach_aio_context(server->ioc); > + for (i = 0; i < server->vu_dev.max_queues; i++) { > + if (server->kick_info[i].cb) { > + aio_set_fd_handler(ctx, server->kick_info[i].fd, false, NULL, > + NULL, NULL, NULL); > + } > + } > +} > + > +static void attach_context(VuServer *server, AioContext *ctx) > +{ > + int i; > + qio_channel_attach_aio_context(server->ioc, ctx); > + server->aio_context_changed = true; > + if (server->co_trip) { > + aio_co_schedule(ctx, server->co_trip); > + } > + for (i = 0; i < server->vu_dev.max_queues; i++) { > + if (server->kick_info[i].cb) { > + aio_set_fd_handler(ctx, server->kick_info[i].fd, false, > + kick_handler, NULL, NULL, > + &server->kick_info[i]); > + } > + } > +} > + > +void vhost_user_server_set_aio_context(AioContext *ctx, VuServer *server) > +{ > + server->ctx = ctx ? ctx : qemu_get_aio_context(); > + if (!server->sioc) { > + return; > + } > + if (ctx) { > + attach_context(server, ctx); > + } else { > + detach_context(server); > + } > +} > + > + > +bool vhost_user_server_start(uint16_t max_queues, > + SocketAddress *socket_addr, > + AioContext *ctx, > + VuServer *server, > + void *device_panic_notifier, > + const VuDevIface *vu_iface, > + Error **errp) > +{ > + server->listener = qio_net_listener_new(); > + if (qio_net_listener_open_sync(server->listener, socket_addr, 1, > + errp) < 0) { > + goto error; > + } > + > + qio_net_listener_set_name(server->listener, > "vhost-user-backend-listener"); > + > + server->vu_iface = vu_iface; > + server->max_queues = max_queues; > + server->ctx = ctx; > + server->device_panic_notifier = device_panic_notifier; > + qio_net_listener_set_client_func(server->listener, > + vu_accept, > + server, > + NULL); The qio_net_listener_set_client_func() call uses the default GMainContext but we have an AioContext *ctx argument. This is surprising. I would expect the socket to be handled in the AioContext. Can you clarify how this should work? > + > + return true; > +error: > + g_free(server); It's surprising that this function frees the server argument when an error occurs. vhost_user_server_stop() does not free server. I suggest letting the caller free server since they own the object. > + return false; > +} > diff --git a/util/vhost-user-server.h b/util/vhost-user-server.h > new file mode 100644 > index 0000000000..4315556b66 > --- /dev/null > +++ b/util/vhost-user-server.h > @@ -0,0 +1,59 @@ > +/* > + * Sharing QEMU devices via vhost-user protocol > + * > + * Author: Coiby Xu <coiby...@gmail.com> > + * > + * This work is licensed under the terms of the GNU GPL, version 2 or > + * later. See the COPYING file in the top-level directory. > + */ > + > +#ifndef VHOST_USER_SERVER_H > +#define VHOST_USER_SERVER_H > + > +#include "contrib/libvhost-user/libvhost-user.h" > +#include "io/channel-socket.h" > +#include "io/channel-file.h" > +#include "io/net-listener.h" > +#include "qemu/error-report.h" > +#include "qapi/error.h" > +#include "standard-headers/linux/virtio_blk.h" > + > +typedef struct KickInfo { > + VuDev *vu_dev; > + int fd; /*kick fd*/ > + long index; /*queue index*/ > + vu_watch_cb cb; > +} KickInfo; > + > +typedef struct VuServer { > + QIONetListener *listener; > + AioContext *ctx; > + void (*device_panic_notifier)(struct VuServer *server) ; > + int max_queues; > + const VuDevIface *vu_iface; > + VuDev vu_dev; > + QIOChannel *ioc; /* The I/O channel with the client */ > + QIOChannelSocket *sioc; /* The underlying data channel with the client */ > + /* IOChannel for fd provided via VHOST_USER_SET_SLAVE_REQ_FD */ > + QIOChannel *ioc_slave; > + QIOChannelSocket *sioc_slave; > + Coroutine *co_trip; /* coroutine for processing VhostUserMsg */ > + KickInfo *kick_info; /* an array with the length of the queue number */ > + /* restart coroutine co_trip if AIOContext is changed */ > + bool aio_context_changed; > +} VuServer; > + > + > +bool vhost_user_server_start(uint16_t max_queues, > + SocketAddress *unix_socket, > + AioContext *ctx, > + VuServer *server, > + void *device_panic_notifier, Please declare the function pointer type: typedef void DevicePanicNotifierFn(struct VuServer *server); Then the argument list can use DevicePanicNotifierFn *device_panic_notifier instead of void *. > + const VuDevIface *vu_iface, > + Error **errp); > + > +void vhost_user_server_stop(VuServer *server); > + > +void vhost_user_server_set_aio_context(AioContext *ctx, VuServer *server); If you send another revision, please make VuServer *server the first argument of vhost_user_server_start() and vhost_user_server_set_aio_context(). Functions usually have the object they act on as the first argument.
signature.asc
Description: PGP signature